aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/upgrade_interop_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/upgrade_interop_test.go')
-rw-r--r--weed/topology/upgrade_interop_test.go473
1 files changed, 473 insertions, 0 deletions
diff --git a/weed/topology/upgrade_interop_test.go b/weed/topology/upgrade_interop_test.go
new file mode 100644
index 000000000..96cfce029
--- /dev/null
+++ b/weed/topology/upgrade_interop_test.go
@@ -0,0 +1,473 @@
+package topology
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/sequence"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ testAssert "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestPreUpgradeNodeCompatibility tests that pre-upgrade nodes (without generation support)
+// can continue working with the new generation-aware system
+func TestPreUpgradeNodeCompatibility(t *testing.T) {
+ t.Run("pre_upgrade_heartbeat_processing", func(t *testing.T) {
+ // Test that heartbeats from pre-upgrade volume servers are processed correctly
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(456)
+
+ // Simulate heartbeat from pre-upgrade volume server (generation=0)
+ ecShardInfo := &master_pb.VolumeEcShardInformationMessage{
+ Id: uint32(volumeId),
+ Collection: "test",
+ EcIndexBits: uint32(0x3FFF), // all 14 shards
+ DiskType: "hdd",
+ Generation: 0, // Pre-upgrade server sends generation 0
+ }
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
+
+ // Process heartbeat - should work fine
+ topo.SyncDataNodeEcShards([]*master_pb.VolumeEcShardInformationMessage{ecShardInfo}, dn)
+
+ // Verify it was registered
+ locations, found := topo.LookupEcShards(volumeId, 0)
+ require.True(t, found, "Pre-upgrade server EC shards should be registered")
+ testAssert.Equal(t, uint32(0), locations.Generation, "Should be registered as generation 0")
+
+ t.Logf("✅ Pre-upgrade server heartbeat processed: volume %d generation %d",
+ volumeId, locations.Generation)
+ })
+
+ t.Run("pre_upgrade_lookup_fallback", func(t *testing.T) {
+ // Test that pre-upgrade clients can lookup volumes using generation 0
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(123)
+
+ // Register generation 2 shards
+ ecInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3FFF), // all 14 shards
+ Generation: 2,
+ }
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
+ topo.RegisterEcShards(ecInfo, dn)
+
+ // Set generation 2 as active
+ topo.SetEcActiveGeneration(volumeId, 2)
+
+ // Pre-upgrade client looks up with generation 0
+ locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
+
+ require.True(t, found, "Pre-upgrade client should find EC volume")
+ testAssert.Equal(t, uint32(2), actualGen, "Should return active generation")
+ testAssert.Equal(t, uint32(2), locations.Generation, "Locations should be for active generation")
+
+ t.Logf("✅ Pre-upgrade client lookup: requested gen=0, got active gen=%d", actualGen)
+ })
+}
+
+// TestPostUpgradeNodeCompatibility tests that post-upgrade nodes (with generation support)
+// can handle legacy data from pre-upgrade nodes
+func TestPostUpgradeNodeCompatibility(t *testing.T) {
+ t.Run("post_upgrade_handles_legacy_data", func(t *testing.T) {
+ // Test that new generation-aware nodes can handle legacy generation 0 data
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(789)
+
+ // Register legacy generation 0 EC volume (from pre-upgrade)
+ legacyEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3FFF),
+ Generation: 0, // Legacy data
+ }
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
+ topo.RegisterEcShards(legacyEcInfo, dn)
+
+ // Post-upgrade client with generation support looks up the volume
+ // When no active generation is set, should fallback to whatever is available
+ locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
+
+ require.True(t, found, "Post-upgrade node should find legacy data")
+ testAssert.Equal(t, uint32(0), actualGen, "Should return generation 0 for legacy data")
+ testAssert.Equal(t, uint32(0), locations.Generation, "Locations should be generation 0")
+
+ t.Logf("✅ Post-upgrade node handles legacy data: found gen=%d", actualGen)
+ })
+
+ t.Run("post_upgrade_prefers_active_generation", func(t *testing.T) {
+ // Test that post-upgrade nodes prefer active generation over legacy
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(999)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
+
+ // Register both legacy (gen 0) and new (gen 1) data
+ legacyEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3FFF),
+ Generation: 0,
+ }
+ topo.RegisterEcShards(legacyEcInfo, dn)
+
+ newEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3FFF),
+ Generation: 1,
+ }
+ topo.RegisterEcShards(newEcInfo, dn)
+
+ // Set generation 1 as active
+ topo.SetEcActiveGeneration(volumeId, 1)
+
+ // Post-upgrade client lookup should prefer active generation
+ locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
+
+ require.True(t, found, "Should find volume")
+ testAssert.Equal(t, uint32(1), actualGen, "Should prefer active generation over legacy")
+ testAssert.Equal(t, uint32(1), locations.Generation, "Locations should be active generation")
+
+ t.Logf("✅ Post-upgrade node prefers active: legacy=0, active=1, returned=%d", actualGen)
+ })
+
+ t.Run("post_upgrade_strict_generation_requests", func(t *testing.T) {
+ // Test that post-upgrade clients can make strict generation requests
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(555)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+ dn := rack.GetOrCreateDataNode("server1", 8080, 0, "127.0.0.1", nil)
+
+ // Register multiple generations
+ for gen := uint32(0); gen <= 2; gen++ {
+ ecInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3FFF),
+ Generation: gen,
+ }
+ topo.RegisterEcShards(ecInfo, dn)
+ }
+
+ // Test strict generation requests
+ for requestedGen := uint32(0); requestedGen <= 2; requestedGen++ {
+ locations, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, requestedGen)
+
+ if requestedGen == 0 {
+ // Generation 0 requests use active generation logic
+ require.True(t, found, "Generation 0 request should find volume")
+ } else {
+ // Specific generation requests should return exact match
+ require.True(t, found, "Specific generation request should find exact match")
+ testAssert.Equal(t, requestedGen, actualGen, "Should return exact requested generation")
+ testAssert.Equal(t, requestedGen, locations.Generation, "Locations should match requested generation")
+ }
+ }
+
+ t.Logf("✅ Post-upgrade strict requests work for all generations")
+ })
+}
+
+// TestMixedClusterOperations tests operations in a mixed cluster
+// where some nodes are pre-upgrade and some are post-upgrade
+func TestMixedClusterOperations(t *testing.T) {
+ t.Run("mixed_cluster_shard_distribution", func(t *testing.T) {
+ // Test that EC shards can be distributed across mixed-version nodes
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(777)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+
+ // Pre-upgrade node (sends generation 0)
+ preUpgradeNode := rack.GetOrCreateDataNode("pre-upgrade", 8080, 0, "127.0.0.1", nil)
+
+ // Post-upgrade node (sends specific generation)
+ postUpgradeNode := rack.GetOrCreateDataNode("post-upgrade", 8081, 0, "127.0.0.2", nil)
+
+ // Pre-upgrade node reports shards with generation 0
+ preUpgradeShards := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x1FF), // shards 0-8
+ Generation: 0,
+ }
+ topo.RegisterEcShards(preUpgradeShards, preUpgradeNode)
+
+ // Post-upgrade node reports shards with generation 1
+ postUpgradeShards := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(0x3E00), // shards 9-13
+ Generation: 1,
+ }
+ topo.RegisterEcShards(postUpgradeShards, postUpgradeNode)
+
+ // Verify both generations are registered
+ gen0Locations, found0 := topo.LookupEcShards(volumeId, 0)
+ gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
+
+ require.True(t, found0, "Generation 0 shards should be registered")
+ require.True(t, found1, "Generation 1 shards should be registered")
+
+ gen0ShardCount := countShards(gen0Locations)
+ gen1ShardCount := countShards(gen1Locations)
+
+ testAssert.Equal(t, 9, gen0ShardCount, "Pre-upgrade node should have 9 shards")
+ testAssert.Equal(t, 5, gen1ShardCount, "Post-upgrade node should have 5 shards")
+
+ t.Logf("✅ Mixed cluster shard distribution: gen0=%d shards, gen1=%d shards",
+ gen0ShardCount, gen1ShardCount)
+ })
+}
+
+// TestRollingUpgradeScenarios tests specific rolling upgrade scenarios
+func TestRollingUpgradeScenarios(t *testing.T) {
+ t.Run("rolling_upgrade_sequence", func(t *testing.T) {
+ // Test the complete rolling upgrade sequence
+
+ topo := NewTopology("test", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ volumeId := needle.VolumeId(123)
+
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+
+ // Create 6 nodes representing a cluster during rolling upgrade
+ nodes := make([]*DataNode, 6)
+ for i := 0; i < 6; i++ {
+ nodes[i] = rack.GetOrCreateDataNode(fmt.Sprintf("node%d", i), 8080+i, 0, fmt.Sprintf("127.0.0.%d", i+1), nil)
+ }
+
+ // Phase 1: All nodes are pre-upgrade (generation 0)
+ t.Run("phase1_all_pre_upgrade", func(t *testing.T) {
+ for i, node := range nodes {
+ ecInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(1 << i), // Each node has one shard
+ Generation: 0,
+ }
+ topo.RegisterEcShards(ecInfo, node)
+ }
+
+ // Verify all shards are generation 0
+ locations, found := topo.LookupEcShards(volumeId, 0)
+ require.True(t, found, "Should find generation 0 volume")
+ testAssert.Equal(t, 6, countShards(locations), "Should have 6 shards")
+
+ t.Logf("✅ Phase 1: All 6 nodes running pre-upgrade with generation 0")
+ })
+
+ // Phase 2: Partially upgraded cluster (3 nodes upgraded)
+ t.Run("phase2_partial_upgrade", func(t *testing.T) {
+ // Nodes 3-5 are upgraded and now understand generations
+ // They re-register their shards as generation 1
+ for i := 3; i < 6; i++ {
+ // Unregister old generation 0 shard
+ oldEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(1 << i),
+ Generation: 0,
+ }
+ topo.UnRegisterEcShards(oldEcInfo, nodes[i])
+
+ // Register new generation 1 shard
+ newEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(1 << i),
+ Generation: 1,
+ }
+ topo.RegisterEcShards(newEcInfo, nodes[i])
+ }
+
+ // Verify mixed generations
+ gen0Locations, found0 := topo.LookupEcShards(volumeId, 0)
+ gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
+
+ require.True(t, found0, "Should still have generation 0 shards")
+ require.True(t, found1, "Should have generation 1 shards")
+
+ testAssert.Equal(t, 3, countShards(gen0Locations), "Should have 3 gen 0 shards")
+ testAssert.Equal(t, 3, countShards(gen1Locations), "Should have 3 gen 1 shards")
+
+ t.Logf("✅ Phase 2: Mixed cluster - 3 nodes gen 0, 3 nodes gen 1")
+ })
+
+ // Phase 3: Fully upgraded cluster
+ t.Run("phase3_full_upgrade", func(t *testing.T) {
+ // Remaining nodes 0-2 are upgraded
+ for i := 0; i < 3; i++ {
+ // Unregister old generation 0 shard
+ oldEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(1 << i),
+ Generation: 0,
+ }
+ topo.UnRegisterEcShards(oldEcInfo, nodes[i])
+
+ // Register new generation 1 shard
+ newEcInfo := &erasure_coding.EcVolumeInfo{
+ VolumeId: volumeId,
+ Collection: "test",
+ ShardBits: erasure_coding.ShardBits(1 << i),
+ Generation: 1,
+ }
+ topo.RegisterEcShards(newEcInfo, nodes[i])
+ }
+
+ // Set generation 1 as active
+ topo.SetEcActiveGeneration(volumeId, 1)
+
+ // Verify only generation 1 remains
+ _, found0 := topo.LookupEcShards(volumeId, 0)
+ gen1Locations, found1 := topo.LookupEcShards(volumeId, 1)
+
+ testAssert.False(t, found0, "Should no longer have generation 0 shards")
+ require.True(t, found1, "Should have generation 1 shards")
+ testAssert.Equal(t, 6, countShards(gen1Locations), "Should have all 6 gen 1 shards")
+
+ // Test that lookups now prefer generation 1
+ _, actualGen, found := topo.LookupEcShardsWithFallback(volumeId, 0)
+ require.True(t, found, "Should find volume")
+ testAssert.Equal(t, uint32(1), actualGen, "Should return active generation 1")
+
+ t.Logf("✅ Phase 3: All nodes upgraded to generation 1, old generation cleaned up")
+ })
+ })
+}
+
+// TestGenerationCompatibilityMatrix tests all combinations of client/server generations
+func TestGenerationCompatibilityMatrix(t *testing.T) {
+ // Test matrix of generation compatibility for various upgrade scenarios
+ testCases := []struct {
+ name string
+ clientType string
+ serverGeneration uint32
+ requestGeneration uint32
+ shouldBeCompatible bool
+ description string
+ }{
+ {
+ name: "pre_client_to_pre_server",
+ clientType: "pre-upgrade",
+ serverGeneration: 0,
+ requestGeneration: 0,
+ shouldBeCompatible: true,
+ description: "Pre-upgrade client to pre-upgrade server",
+ },
+ {
+ name: "pre_client_to_post_server_gen1",
+ clientType: "pre-upgrade",
+ serverGeneration: 1,
+ requestGeneration: 0,
+ shouldBeCompatible: true,
+ description: "Pre-upgrade client to generation 1 server",
+ },
+ {
+ name: "pre_client_to_post_server_gen2",
+ clientType: "pre-upgrade",
+ serverGeneration: 2,
+ requestGeneration: 0,
+ shouldBeCompatible: true,
+ description: "Pre-upgrade client to generation 2 server",
+ },
+ {
+ name: "post_client_exact_match",
+ clientType: "post-upgrade",
+ serverGeneration: 1,
+ requestGeneration: 1,
+ shouldBeCompatible: true,
+ description: "Post-upgrade client exact generation match",
+ },
+ {
+ name: "post_client_strict_mismatch",
+ clientType: "post-upgrade",
+ serverGeneration: 0,
+ requestGeneration: 1,
+ shouldBeCompatible: false,
+ description: "Post-upgrade client strict mismatch",
+ },
+ {
+ name: "post_client_legacy_request",
+ clientType: "post-upgrade",
+ serverGeneration: 1,
+ requestGeneration: 0,
+ shouldBeCompatible: true,
+ description: "Post-upgrade client with legacy request",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Use the isGenerationCompatible function from volume_grpc_erasure_coding.go
+ compatible := isGenerationCompatible(tc.serverGeneration, tc.requestGeneration)
+
+ testAssert.Equal(t, tc.shouldBeCompatible, compatible, tc.description)
+
+ if compatible {
+ t.Logf("✅ %s: server_gen=%d, request_gen=%d → COMPATIBLE",
+ tc.description, tc.serverGeneration, tc.requestGeneration)
+ } else {
+ t.Logf("❌ %s: server_gen=%d, request_gen=%d → INCOMPATIBLE",
+ tc.description, tc.serverGeneration, tc.requestGeneration)
+ }
+ })
+ }
+}
+
+// Helper function to count shards in EcShardLocations
+func countShards(locations *EcShardLocations) int {
+ count := 0
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ if len(locations.Locations[i]) > 0 {
+ count++
+ }
+ }
+ return count
+}
+
+// Helper function to simulate isGenerationCompatible from volume_grpc_erasure_coding.go
+func isGenerationCompatible(actualGeneration, requestedGeneration uint32) bool {
+ // Exact match is always compatible
+ if actualGeneration == requestedGeneration {
+ return true
+ }
+
+ // Mixed-version compatibility: if client requests generation 0 (default/legacy),
+ // allow access to any generation for backward compatibility
+ if requestedGeneration == 0 {
+ return true
+ }
+
+ // If client requests specific generation but volume has different generation,
+ // this is not compatible (strict generation matching)
+ return false
+}