aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/data_node.go1
-rw-r--r--weed/topology/rack.go69
-rw-r--r--weed/topology/topology_test.go119
3 files changed, 178 insertions, 11 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 4f2dbe464..07e00ac0a 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -269,6 +269,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
GrpcPort: uint32(dn.GrpcPort),
+ Address: dn.Url(), // ip:port for connecting to the volume server
}
for _, c := range dn.Children() {
disk := c.(*Disk)
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index f526cd84d..1e5c8b632 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -5,6 +5,7 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -34,17 +35,73 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
+
+// FindDataNodeById finds a DataNode by its ID using O(1) map lookup
+func (r *Rack) FindDataNodeById(id string) *DataNode {
+ r.RLock()
+ defer r.RUnlock()
+ if c, ok := r.children[NodeId(id)]; ok {
+ return c.(*DataNode)
+ }
+ return nil
+}
+
+func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode {
r.Lock()
defer r.Unlock()
- for _, c := range r.children {
+
+ // Normalize the id parameter (trim whitespace)
+ id = strings.TrimSpace(id)
+
+ // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
+ nodeId := util.GetVolumeServerId(id, ip, port)
+
+ // First, try to find by node ID using O(1) map lookup (stable identity)
+ if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- return dn
+ // Log if IP or Port changed (e.g., pod rescheduled in K8s)
+ if dn.Ip != ip || dn.Port != port {
+ glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
}
+ // Update the IP/Port in case they changed
+ dn.Ip = ip
+ dn.Port = port
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
}
- dn := NewDataNode(util.JoinHostPort(ip, port))
+
+ // For backward compatibility: if explicit id was provided, also check by ip:port
+ // to handle transition from old (ip:port) to new (explicit id) behavior
+ ipPortId := util.JoinHostPort(ip, port)
+ if nodeId != ipPortId {
+ for oldId, c := range r.children {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ // Only transition if the oldId exactly matches ip:port (legacy identification).
+ // If oldId is different, this is a node with an explicit id that happens to
+ // reuse the same ip:port - don't incorrectly merge them.
+ if string(oldId) != ipPortId {
+ glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId)
+ continue
+ }
+ // Found a legacy node identified by ip:port, transition it to use the new explicit id
+ glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId)
+ // Re-key the node in the children map with the new id
+ delete(r.children, oldId)
+ dn.id = NodeId(nodeId)
+ r.children[NodeId(nodeId)] = dn
+ // Update connection info in case they changed
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
+ }
+ }
+ }
+
+ dn := NewDataNode(nodeId)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 8515d2f81..e5a8969fc 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
{
volumeCount := 7
@@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
@@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable volume
v := storage.VolumeInfo{
@@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable, local volume
v := storage.VolumeInfo{
@@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(1111),
@@ -396,3 +396,112 @@ func TestListCollections(t *testing.T) {
})
}
}
+
+func TestDataNodeIdBasedIdentification(t *testing.T) {
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+
+ maxVolumeCounts := make(map[string]uint32)
+ maxVolumeCounts[""] = 10
+
+ // Test 1: Create a DataNode with explicit id
+ dn1 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-1", maxVolumeCounts)
+ if string(dn1.Id()) != "node-1" {
+ t.Errorf("expected node id 'node-1', got '%s'", dn1.Id())
+ }
+ if dn1.Ip != "10.0.0.1" {
+ t.Errorf("expected ip '10.0.0.1', got '%s'", dn1.Ip)
+ }
+
+ // Test 2: Same id with different IP should return the same DataNode (K8s pod reschedule scenario)
+ dn2 := rack.GetOrCreateDataNode("10.0.0.2", 8080, 18080, "10.0.0.2:8080", "node-1", maxVolumeCounts)
+ if dn1 != dn2 {
+ t.Errorf("expected same DataNode for same id, got different nodes")
+ }
+ // IP should be updated to the new value
+ if dn2.Ip != "10.0.0.2" {
+ t.Errorf("expected ip to be updated to '10.0.0.2', got '%s'", dn2.Ip)
+ }
+ if dn2.PublicUrl != "10.0.0.2:8080" {
+ t.Errorf("expected publicUrl to be updated to '10.0.0.2:8080', got '%s'", dn2.PublicUrl)
+ }
+
+ // Test 3: Different id should create a new DataNode
+ dn3 := rack.GetOrCreateDataNode("10.0.0.3", 8080, 18080, "10.0.0.3:8080", "node-2", maxVolumeCounts)
+ if string(dn3.Id()) != "node-2" {
+ t.Errorf("expected node id 'node-2', got '%s'", dn3.Id())
+ }
+ if dn1 == dn3 {
+ t.Errorf("expected different DataNode for different id")
+ }
+
+ // Test 4: Empty id should fall back to ip:port (backward compatibility)
+ dn4 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
+ if string(dn4.Id()) != "10.0.0.4:8080" {
+ t.Errorf("expected node id '10.0.0.4:8080' for empty id, got '%s'", dn4.Id())
+ }
+
+ // Test 5: Same ip:port with empty id should return the same DataNode
+ dn5 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
+ if dn4 != dn5 {
+ t.Errorf("expected same DataNode for same ip:port with empty id")
+ }
+
+ // Verify we have 3 unique DataNodes total:
+ // - node-1 (dn1/dn2 share the same id)
+ // - node-2 (dn3)
+ // - 10.0.0.4:8080 (dn4/dn5 share the same ip:port)
+ children := rack.Children()
+ if len(children) != 3 {
+ t.Errorf("expected 3 DataNodes, got %d", len(children))
+ }
+
+ // Test 6: Transition from ip:port to explicit id
+ // First, the node exists with ip:port as id (dn4/dn5)
+ // Now the same volume server starts sending an explicit id
+ dn6 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "node-4-explicit", maxVolumeCounts)
+ // Should return the same DataNode instance
+ if dn6 != dn4 {
+ t.Errorf("expected same DataNode instance during transition")
+ }
+ // But the id should now be updated to the explicit id
+ if string(dn6.Id()) != "node-4-explicit" {
+ t.Errorf("expected node id to transition to 'node-4-explicit', got '%s'", dn6.Id())
+ }
+ // The node should be re-keyed in the children map
+ if rack.FindDataNodeById("node-4-explicit") != dn6 {
+ t.Errorf("expected to find DataNode by new explicit id")
+ }
+ // Old ip:port key should no longer work
+ if rack.FindDataNodeById("10.0.0.4:8080") != nil {
+ t.Errorf("expected old ip:port id to be removed from children map")
+ }
+
+ // Still 3 unique DataNodes (node-1, node-2, node-4-explicit)
+ children = rack.Children()
+ if len(children) != 3 {
+ t.Errorf("expected 3 DataNodes after transition, got %d", len(children))
+ }
+
+ // Test 7: Prevent incorrect transition when a new node reuses ip:port of a node with explicit id
+ // Scenario: node-1 runs at 10.0.0.1:8080, dies, new node-99 starts at same ip:port
+ // The transition should NOT happen because node-1 already has an explicit id
+ dn7 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-99", maxVolumeCounts)
+ // Should create a NEW DataNode, not reuse node-1
+ if dn7 == dn1 {
+ t.Errorf("expected new DataNode for node-99, got reused node-1")
+ }
+ if string(dn7.Id()) != "node-99" {
+ t.Errorf("expected node id 'node-99', got '%s'", dn7.Id())
+ }
+ // node-1 should still exist with its original id
+ if rack.FindDataNodeById("node-1") == nil {
+ t.Errorf("node-1 should still exist")
+ }
+ // Now we have 4 DataNodes
+ children = rack.Children()
+ if len(children) != 4 {
+ t.Errorf("expected 4 DataNodes, got %d", len(children))
+ }
+}