diff options
Diffstat (limited to 'weed/topology/upgrade_interop_test.go')
| -rw-r--r-- | weed/topology/upgrade_interop_test.go | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/weed/topology/upgrade_interop_test.go b/weed/topology/upgrade_interop_test.go new file mode 100644 index 000000000..96cfce029 --- /dev/null +++ b/weed/topology/upgrade_interop_test.go @@ -0,0 +1,473 @@ +package topology + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/sequence" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + testAssert "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestPreUpgradeNodeCompatibility tests that pre-upgrade nodes (without generation support) +// can continue working with the new generation-aware system +func TestPreUpgradeNodeCompatibility(t *testing.T) { + t.Run("pre_upgrade_heartbeat_processing", func(t *testing.T) { + // Test that heartbeats from pre-upgrade volume servers are processed correctly + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(456) + + // Simulate heartbeat from pre-upgrade volume server (generation=0) + ecShardInfo := &master_pb.VolumeEcShardInformationMessage{ + Id: uint32(volumeId), + Collection: "test", + EcIndexBits: uint32(0x3FFF), // all 14 shards + DiskType: "hdd", + Generation: 0, // Pre-upgrade server sends generation 0 + } + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil) + + // Process heartbeat - should work fine + topo.SyncDataNodeEcShards([]*master_pb.VolumeEcShardInformationMessage{ecShardInfo}, dn) + + // Verify it was registered + locations, found := topo.LookupEcShards(volumeId, 0) + require.True(t, found, "Pre-upgrade server EC shards should be registered") + testAssert.Equal(t, uint32(0), locations.Generation, "Should be registered as generation 0") + + t.Logf("✅ Pre-upgrade server heartbeat processed: volume %d generation %d", + volumeId, locations.Generation) + }) + + t.Run("pre_upgrade_lookup_fallback", func(t *testing.T) { + // Test that pre-upgrade clients can lookup volumes using generation 0 + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(123) + + // Register generation 2 shards + ecInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3FFF), // all 14 shards + Generation: 2, + } + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil) + topo.RegisterEcShards(ecInfo, dn) + + // Set generation 2 as active + topo.SetEcActiveGeneration(volumeId, 2) + + // Pre-upgrade client looks up with generation 0 + locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0) + + require.True(t, found, "Pre-upgrade client should find EC volume") + testAssert.Equal(t, uint32(2), actualGen, "Should return active generation") + testAssert.Equal(t, uint32(2), locations.Generation, "Locations should be for active generation") + + t.Logf("✅ Pre-upgrade client lookup: requested gen=0, got active gen=%d", actualGen) + }) +} + +// TestPostUpgradeNodeCompatibility tests that post-upgrade nodes (with generation support) +// can handle legacy data from pre-upgrade nodes +func TestPostUpgradeNodeCompatibility(t *testing.T) { + t.Run("post_upgrade_handles_legacy_data", func(t *testing.T) { + // Test that new generation-aware nodes can handle legacy generation 0 data + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(789) + + // Register legacy generation 0 EC volume (from pre-upgrade) + legacyEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3FFF), + Generation: 0, // Legacy data + } + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil) + topo.RegisterEcShards(legacyEcInfo, dn) + + // Post-upgrade client with generation support looks up the volume + // When no active generation is set, should fallback to whatever is available + locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0) + + require.True(t, found, "Post-upgrade node should find legacy data") + testAssert.Equal(t, uint32(0), actualGen, "Should return generation 0 for legacy data") + testAssert.Equal(t, uint32(0), locations.Generation, "Locations should be generation 0") + + t.Logf("✅ Post-upgrade node handles legacy data: found gen=%d", actualGen) + }) + + t.Run("post_upgrade_prefers_active_generation", func(t *testing.T) { + // Test that post-upgrade nodes prefer active generation over legacy + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(999) + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil) + + // Register both legacy (gen 0) and new (gen 1) data + legacyEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3FFF), + Generation: 0, + } + topo.RegisterEcShards(legacyEcInfo, dn) + + newEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3FFF), + Generation: 1, + } + topo.RegisterEcShards(newEcInfo, dn) + + // Set generation 1 as active + topo.SetEcActiveGeneration(volumeId, 1) + + // Post-upgrade client lookup should prefer active generation + locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0) + + require.True(t, found, "Should find volume") + testAssert.Equal(t, uint32(1), actualGen, "Should prefer active generation over legacy") + testAssert.Equal(t, uint32(1), locations.Generation, "Locations should be active generation") + + t.Logf("✅ Post-upgrade node prefers active: legacy=0, active=1, returned=%d", actualGen) + }) + + t.Run("post_upgrade_strict_generation_requests", func(t *testing.T) { + // Test that post-upgrade clients can make strict generation requests + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(555) + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil) + + // Register multiple generations + for gen := uint32(0); gen <= 2; gen++ { + ecInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3FFF), + Generation: gen, + } + topo.RegisterEcShards(ecInfo, dn) + } + + // Test strict generation requests + for requestedGen := uint32(0); requestedGen <= 2; requestedGen++ { + locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, requestedGen) + + if requestedGen == 0 { + // Generation 0 requests use active generation logic + require.True(t, found, "Generation 0 request should find volume") + } else { + // Specific generation requests should return exact match + require.True(t, found, "Specific generation request should find exact match") + testAssert.Equal(t, requestedGen, actualGen, "Should return exact requested generation") + testAssert.Equal(t, requestedGen, locations.Generation, "Locations should match requested generation") + } + } + + t.Logf("✅ Post-upgrade strict requests work for all generations") + }) +} + +// TestMixedClusterOperations tests operations in a mixed cluster +// where some nodes are pre-upgrade and some are post-upgrade +func TestMixedClusterOperations(t *testing.T) { + t.Run("mixed_cluster_shard_distribution", func(t *testing.T) { + // Test that EC shards can be distributed across mixed-version nodes + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(777) + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + + // Pre-upgrade node (sends generation 0) + preUpgradeNode := rack.GetOrCreateDataNode("pre-upgrade", 8080, 0, "127.0.0.1", nil) + + // Post-upgrade node (sends specific generation) + postUpgradeNode := rack.GetOrCreateDataNode("post-upgrade", 8081, 0, "127.0.0.2", nil) + + // Pre-upgrade node reports shards with generation 0 + preUpgradeShards := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x1FF), // shards 0-8 + Generation: 0, + } + topo.RegisterEcShards(preUpgradeShards, preUpgradeNode) + + // Post-upgrade node reports shards with generation 1 + postUpgradeShards := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(0x3E00), // shards 9-13 + Generation: 1, + } + topo.RegisterEcShards(postUpgradeShards, postUpgradeNode) + + // Verify both generations are registered + gen0Locations, found0 := topo.LookupEcShards(volumeId, 0) + gen1Locations, found1 := topo.LookupEcShards(volumeId, 1) + + require.True(t, found0, "Generation 0 shards should be registered") + require.True(t, found1, "Generation 1 shards should be registered") + + gen0ShardCount := countShards(gen0Locations) + gen1ShardCount := countShards(gen1Locations) + + testAssert.Equal(t, 9, gen0ShardCount, "Pre-upgrade node should have 9 shards") + testAssert.Equal(t, 5, gen1ShardCount, "Post-upgrade node should have 5 shards") + + t.Logf("✅ Mixed cluster shard distribution: gen0=%d shards, gen1=%d shards", + gen0ShardCount, gen1ShardCount) + }) +} + +// TestRollingUpgradeScenarios tests specific rolling upgrade scenarios +func TestRollingUpgradeScenarios(t *testing.T) { + t.Run("rolling_upgrade_sequence", func(t *testing.T) { + // Test the complete rolling upgrade sequence + + topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false) + volumeId := needle.VolumeId(123) + + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + + // Create 6 nodes representing a cluster during rolling upgrade + nodes := make([]*DataNode, 6) + for i := 0; i < 6; i++ { + nodes[i] = rack.GetOrCreateDataNode(fmt.Sprintf("node%d", i), 8080+i, 0, fmt.Sprintf("127.0.0.%d", i+1), nil) + } + + // Phase 1: All nodes are pre-upgrade (generation 0) + t.Run("phase1_all_pre_upgrade", func(t *testing.T) { + for i, node := range nodes { + ecInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(1 << i), // Each node has one shard + Generation: 0, + } + topo.RegisterEcShards(ecInfo, node) + } + + // Verify all shards are generation 0 + locations, found := topo.LookupEcShards(volumeId, 0) + require.True(t, found, "Should find generation 0 volume") + testAssert.Equal(t, 6, countShards(locations), "Should have 6 shards") + + t.Logf("✅ Phase 1: All 6 nodes running pre-upgrade with generation 0") + }) + + // Phase 2: Partially upgraded cluster (3 nodes upgraded) + t.Run("phase2_partial_upgrade", func(t *testing.T) { + // Nodes 3-5 are upgraded and now understand generations + // They re-register their shards as generation 1 + for i := 3; i < 6; i++ { + // Unregister old generation 0 shard + oldEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(1 << i), + Generation: 0, + } + topo.UnRegisterEcShards(oldEcInfo, nodes[i]) + + // Register new generation 1 shard + newEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(1 << i), + Generation: 1, + } + topo.RegisterEcShards(newEcInfo, nodes[i]) + } + + // Verify mixed generations + gen0Locations, found0 := topo.LookupEcShards(volumeId, 0) + gen1Locations, found1 := topo.LookupEcShards(volumeId, 1) + + require.True(t, found0, "Should still have generation 0 shards") + require.True(t, found1, "Should have generation 1 shards") + + testAssert.Equal(t, 3, countShards(gen0Locations), "Should have 3 gen 0 shards") + testAssert.Equal(t, 3, countShards(gen1Locations), "Should have 3 gen 1 shards") + + t.Logf("✅ Phase 2: Mixed cluster - 3 nodes gen 0, 3 nodes gen 1") + }) + + // Phase 3: Fully upgraded cluster + t.Run("phase3_full_upgrade", func(t *testing.T) { + // Remaining nodes 0-2 are upgraded + for i := 0; i < 3; i++ { + // Unregister old generation 0 shard + oldEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(1 << i), + Generation: 0, + } + topo.UnRegisterEcShards(oldEcInfo, nodes[i]) + + // Register new generation 1 shard + newEcInfo := &erasure_coding.EcVolumeInfo{ + VolumeId: volumeId, + Collection: "test", + ShardBits: erasure_coding.ShardBits(1 << i), + Generation: 1, + } + topo.RegisterEcShards(newEcInfo, nodes[i]) + } + + // Set generation 1 as active + topo.SetEcActiveGeneration(volumeId, 1) + + // Verify only generation 1 remains + _, found0 := topo.LookupEcShards(volumeId, 0) + gen1Locations, found1 := topo.LookupEcShards(volumeId, 1) + + testAssert.False(t, found0, "Should no longer have generation 0 shards") + require.True(t, found1, "Should have generation 1 shards") + testAssert.Equal(t, 6, countShards(gen1Locations), "Should have all 6 gen 1 shards") + + // Test that lookups now prefer generation 1 + _, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0) + require.True(t, found, "Should find volume") + testAssert.Equal(t, uint32(1), actualGen, "Should return active generation 1") + + t.Logf("✅ Phase 3: All nodes upgraded to generation 1, old generation cleaned up") + }) + }) +} + +// TestGenerationCompatibilityMatrix tests all combinations of client/server generations +func TestGenerationCompatibilityMatrix(t *testing.T) { + // Test matrix of generation compatibility for various upgrade scenarios + testCases := []struct { + name string + clientType string + serverGeneration uint32 + requestGeneration uint32 + shouldBeCompatible bool + description string + }{ + { + name: "pre_client_to_pre_server", + clientType: "pre-upgrade", + serverGeneration: 0, + requestGeneration: 0, + shouldBeCompatible: true, + description: "Pre-upgrade client to pre-upgrade server", + }, + { + name: "pre_client_to_post_server_gen1", + clientType: "pre-upgrade", + serverGeneration: 1, + requestGeneration: 0, + shouldBeCompatible: true, + description: "Pre-upgrade client to generation 1 server", + }, + { + name: "pre_client_to_post_server_gen2", + clientType: "pre-upgrade", + serverGeneration: 2, + requestGeneration: 0, + shouldBeCompatible: true, + description: "Pre-upgrade client to generation 2 server", + }, + { + name: "post_client_exact_match", + clientType: "post-upgrade", + serverGeneration: 1, + requestGeneration: 1, + shouldBeCompatible: true, + description: "Post-upgrade client exact generation match", + }, + { + name: "post_client_strict_mismatch", + clientType: "post-upgrade", + serverGeneration: 0, + requestGeneration: 1, + shouldBeCompatible: false, + description: "Post-upgrade client strict mismatch", + }, + { + name: "post_client_legacy_request", + clientType: "post-upgrade", + serverGeneration: 1, + requestGeneration: 0, + shouldBeCompatible: true, + description: "Post-upgrade client with legacy request", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Use the isGenerationCompatible function from volume_grpc_erasure_coding.go + compatible := isGenerationCompatible(tc.serverGeneration, tc.requestGeneration) + + testAssert.Equal(t, tc.shouldBeCompatible, compatible, tc.description) + + if compatible { + t.Logf("✅ %s: server_gen=%d, request_gen=%d → COMPATIBLE", + tc.description, tc.serverGeneration, tc.requestGeneration) + } else { + t.Logf("❌ %s: server_gen=%d, request_gen=%d → INCOMPATIBLE", + tc.description, tc.serverGeneration, tc.requestGeneration) + } + }) + } +} + +// Helper function to count shards in EcShardLocations +func countShards(locations *EcShardLocations) int { + count := 0 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if len(locations.Locations[i]) > 0 { + count++ + } + } + return count +} + +// Helper function to simulate isGenerationCompatible from volume_grpc_erasure_coding.go +func isGenerationCompatible(actualGeneration, requestedGeneration uint32) bool { + // Exact match is always compatible + if actualGeneration == requestedGeneration { + return true + } + + // Mixed-version compatibility: if client requests generation 0 (default/legacy), + // allow access to any generation for backward compatibility + if requestedGeneration == 0 { + return true + } + + // If client requests specific generation but volume has different generation, + // this is not compatible (strict generation matching) + return false +} |
