aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/rack.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/rack.go')
-rw-r--r--weed/topology/rack.go69
1 files changed, 63 insertions, 6 deletions
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index f526cd84d..1e5c8b632 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -5,6 +5,7 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -34,17 +35,73 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
+
+// FindDataNodeById finds a DataNode by its ID using O(1) map lookup
+func (r *Rack) FindDataNodeById(id string) *DataNode {
+ r.RLock()
+ defer r.RUnlock()
+ if c, ok := r.children[NodeId(id)]; ok {
+ return c.(*DataNode)
+ }
+ return nil
+}
+
+func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode {
r.Lock()
defer r.Unlock()
- for _, c := range r.children {
+
+ // Normalize the id parameter (trim whitespace)
+ id = strings.TrimSpace(id)
+
+ // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
+ nodeId := util.GetVolumeServerId(id, ip, port)
+
+ // First, try to find by node ID using O(1) map lookup (stable identity)
+ if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- return dn
+ // Log if IP or Port changed (e.g., pod rescheduled in K8s)
+ if dn.Ip != ip || dn.Port != port {
+ glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
}
+ // Update the IP/Port in case they changed
+ dn.Ip = ip
+ dn.Port = port
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
}
- dn := NewDataNode(util.JoinHostPort(ip, port))
+
+ // For backward compatibility: if explicit id was provided, also check by ip:port
+ // to handle transition from old (ip:port) to new (explicit id) behavior
+ ipPortId := util.JoinHostPort(ip, port)
+ if nodeId != ipPortId {
+ for oldId, c := range r.children {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ // Only transition if the oldId exactly matches ip:port (legacy identification).
+ // If oldId is different, this is a node with an explicit id that happens to
+ // reuse the same ip:port - don't incorrectly merge them.
+ if string(oldId) != ipPortId {
+ glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId)
+ continue
+ }
+ // Found a legacy node identified by ip:port, transition it to use the new explicit id
+ glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId)
+ // Re-key the node in the children map with the new id
+ delete(r.children, oldId)
+ dn.id = NodeId(nodeId)
+ r.children[NodeId(nodeId)] = dn
+ // Update connection info in case they changed
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
+ }
+ }
+ }
+
+ dn := NewDataNode(nodeId)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort