diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/data_node.go | 1 | ||||
| -rw-r--r-- | weed/topology/rack.go | 69 | ||||
| -rw-r--r-- | weed/topology/topology_test.go | 119 |
3 files changed, 178 insertions, 11 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 4f2dbe464..07e00ac0a 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -269,6 +269,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { Id: string(dn.Id()), DiskInfos: make(map[string]*master_pb.DiskInfo), GrpcPort: uint32(dn.GrpcPort), + Address: dn.Url(), // ip:port for connecting to the volume server } for _, c := range dn.Children() { disk := c.(*Disk) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index f526cd84d..1e5c8b632 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -5,6 +5,7 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" @@ -34,17 +35,73 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode { } return nil } -func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode { + +// FindDataNodeById finds a DataNode by its ID using O(1) map lookup +func (r *Rack) FindDataNodeById(id string) *DataNode { + r.RLock() + defer r.RUnlock() + if c, ok := r.children[NodeId(id)]; ok { + return c.(*DataNode) + } + return nil +} + +func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode { r.Lock() defer r.Unlock() - for _, c := range r.children { + + // Normalize the id parameter (trim whitespace) + id = strings.TrimSpace(id) + + // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility + nodeId := util.GetVolumeServerId(id, ip, port) + + // First, try to find by node ID using O(1) map lookup (stable identity) + if c, ok := r.children[NodeId(nodeId)]; ok { dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - return dn + // Log if IP or Port changed (e.g., pod rescheduled in K8s) + if dn.Ip != ip || dn.Port != port { + glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port) } + // Update the IP/Port in case they changed + dn.Ip = ip + dn.Port = port + dn.GrpcPort = grpcPort + dn.PublicUrl = publicUrl + dn.LastSeen = time.Now().Unix() + return dn } - dn := NewDataNode(util.JoinHostPort(ip, port)) + + // For backward compatibility: if explicit id was provided, also check by ip:port + // to handle transition from old (ip:port) to new (explicit id) behavior + ipPortId := util.JoinHostPort(ip, port) + if nodeId != ipPortId { + for oldId, c := range r.children { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + // Only transition if the oldId exactly matches ip:port (legacy identification). + // If oldId is different, this is a node with an explicit id that happens to + // reuse the same ip:port - don't incorrectly merge them. + if string(oldId) != ipPortId { + glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId) + continue + } + // Found a legacy node identified by ip:port, transition it to use the new explicit id + glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId) + // Re-key the node in the children map with the new id + delete(r.children, oldId) + dn.id = NodeId(nodeId) + r.children[NodeId(nodeId)] = dn + // Update connection info in case they changed + dn.GrpcPort = grpcPort + dn.PublicUrl = publicUrl + dn.LastSeen = time.Now().Unix() + return dn + } + } + } + + dn := NewDataNode(nodeId) dn.Ip = ip dn.Port = port dn.GrpcPort = grpcPort diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go index 8515d2f81..e5a8969fc 100644 --- a/weed/topology/topology_test.go +++ b/weed/topology/topology_test.go @@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) { volumeCount := 7 @@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) { maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 maxVolumeCounts["ssd"] = 12 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) v := storage.VolumeInfo{ Id: needle.VolumeId(1), @@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable volume v := storage.VolumeInfo{ @@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) { rack := dc.GetOrCreateRack("rack1") maxVolumeCounts := make(map[string]uint32) maxVolumeCounts[""] = 25 - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts) // Create a writable, local volume v := storage.VolumeInfo{ @@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) { topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) dc := topo.GetOrCreateDataCenter("dc1") rack := dc.GetOrCreateRack("rack1") - dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil) + dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil) topo.RegisterVolumeLayout(storage.VolumeInfo{ Id: needle.VolumeId(1111), @@ -396,3 +396,112 @@ func TestListCollections(t *testing.T) { }) } } + +func TestDataNodeIdBasedIdentification(t *testing.T) { + topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false) + dc := topo.GetOrCreateDataCenter("dc1") + rack := dc.GetOrCreateRack("rack1") + + maxVolumeCounts := make(map[string]uint32) + maxVolumeCounts[""] = 10 + + // Test 1: Create a DataNode with explicit id + dn1 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-1", maxVolumeCounts) + if string(dn1.Id()) != "node-1" { + t.Errorf("expected node id 'node-1', got '%s'", dn1.Id()) + } + if dn1.Ip != "10.0.0.1" { + t.Errorf("expected ip '10.0.0.1', got '%s'", dn1.Ip) + } + + // Test 2: Same id with different IP should return the same DataNode (K8s pod reschedule scenario) + dn2 := rack.GetOrCreateDataNode("10.0.0.2", 8080, 18080, "10.0.0.2:8080", "node-1", maxVolumeCounts) + if dn1 != dn2 { + t.Errorf("expected same DataNode for same id, got different nodes") + } + // IP should be updated to the new value + if dn2.Ip != "10.0.0.2" { + t.Errorf("expected ip to be updated to '10.0.0.2', got '%s'", dn2.Ip) + } + if dn2.PublicUrl != "10.0.0.2:8080" { + t.Errorf("expected publicUrl to be updated to '10.0.0.2:8080', got '%s'", dn2.PublicUrl) + } + + // Test 3: Different id should create a new DataNode + dn3 := rack.GetOrCreateDataNode("10.0.0.3", 8080, 18080, "10.0.0.3:8080", "node-2", maxVolumeCounts) + if string(dn3.Id()) != "node-2" { + t.Errorf("expected node id 'node-2', got '%s'", dn3.Id()) + } + if dn1 == dn3 { + t.Errorf("expected different DataNode for different id") + } + + // Test 4: Empty id should fall back to ip:port (backward compatibility) + dn4 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts) + if string(dn4.Id()) != "10.0.0.4:8080" { + t.Errorf("expected node id '10.0.0.4:8080' for empty id, got '%s'", dn4.Id()) + } + + // Test 5: Same ip:port with empty id should return the same DataNode + dn5 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts) + if dn4 != dn5 { + t.Errorf("expected same DataNode for same ip:port with empty id") + } + + // Verify we have 3 unique DataNodes total: + // - node-1 (dn1/dn2 share the same id) + // - node-2 (dn3) + // - 10.0.0.4:8080 (dn4/dn5 share the same ip:port) + children := rack.Children() + if len(children) != 3 { + t.Errorf("expected 3 DataNodes, got %d", len(children)) + } + + // Test 6: Transition from ip:port to explicit id + // First, the node exists with ip:port as id (dn4/dn5) + // Now the same volume server starts sending an explicit id + dn6 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "node-4-explicit", maxVolumeCounts) + // Should return the same DataNode instance + if dn6 != dn4 { + t.Errorf("expected same DataNode instance during transition") + } + // But the id should now be updated to the explicit id + if string(dn6.Id()) != "node-4-explicit" { + t.Errorf("expected node id to transition to 'node-4-explicit', got '%s'", dn6.Id()) + } + // The node should be re-keyed in the children map + if rack.FindDataNodeById("node-4-explicit") != dn6 { + t.Errorf("expected to find DataNode by new explicit id") + } + // Old ip:port key should no longer work + if rack.FindDataNodeById("10.0.0.4:8080") != nil { + t.Errorf("expected old ip:port id to be removed from children map") + } + + // Still 3 unique DataNodes (node-1, node-2, node-4-explicit) + children = rack.Children() + if len(children) != 3 { + t.Errorf("expected 3 DataNodes after transition, got %d", len(children)) + } + + // Test 7: Prevent incorrect transition when a new node reuses ip:port of a node with explicit id + // Scenario: node-1 runs at 10.0.0.1:8080, dies, new node-99 starts at same ip:port + // The transition should NOT happen because node-1 already has an explicit id + dn7 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-99", maxVolumeCounts) + // Should create a NEW DataNode, not reuse node-1 + if dn7 == dn1 { + t.Errorf("expected new DataNode for node-99, got reused node-1") + } + if string(dn7.Id()) != "node-99" { + t.Errorf("expected node id 'node-99', got '%s'", dn7.Id()) + } + // node-1 should still exist with its original id + if rack.FindDataNodeById("node-1") == nil { + t.Errorf("node-1 should still exist") + } + // Now we have 4 DataNodes + children = rack.Children() + if len(children) != 4 { + t.Errorf("expected 4 DataNodes, got %d", len(children)) + } +} |
