1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
package topology
import (
"slices"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type Rack struct {
NodeImpl
}
func NewRack(id string) *Rack {
r := &Rack{}
r.id = NodeId(id)
r.nodeType = "Rack"
r.diskUsages = newDiskUsages()
r.children = make(map[NodeId]Node)
r.capacityReservations = newCapacityReservations()
r.NodeImpl.value = r
return r
}
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
}
// FindDataNodeById finds a DataNode by its ID using O(1) map lookup
func (r *Rack) FindDataNodeById(id string) *DataNode {
r.RLock()
defer r.RUnlock()
if c, ok := r.children[NodeId(id)]; ok {
return c.(*DataNode)
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode {
r.Lock()
defer r.Unlock()
// Normalize the id parameter (trim whitespace)
id = strings.TrimSpace(id)
// Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
nodeId := util.GetVolumeServerId(id, ip, port)
// First, try to find by node ID using O(1) map lookup (stable identity)
if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
// Log if IP or Port changed (e.g., pod rescheduled in K8s)
if dn.Ip != ip || dn.Port != port {
glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
}
// Update the IP/Port in case they changed
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
return dn
}
// For backward compatibility: if explicit id was provided, also check by ip:port
// to handle transition from old (ip:port) to new (explicit id) behavior
ipPortId := util.JoinHostPort(ip, port)
if nodeId != ipPortId {
for oldId, c := range r.children {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
// Only transition if the oldId exactly matches ip:port (legacy identification).
// If oldId is different, this is a node with an explicit id that happens to
// reuse the same ip:port - don't incorrectly merge them.
if string(oldId) != ipPortId {
glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId)
continue
}
// Found a legacy node identified by ip:port, transition it to use the new explicit id
glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId)
// Re-key the node in the children map with the new id
delete(r.children, oldId)
dn.id = NodeId(nodeId)
r.children[NodeId(nodeId)] = dn
// Update connection info in case they changed
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
return dn
}
}
}
dn := NewDataNode(nodeId)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort
dn.PublicUrl = publicUrl
dn.LastSeen = time.Now().Unix()
r.doLinkChildNode(dn)
for diskType, maxVolumeCount := range maxVolumeCounts {
disk := NewDisk(diskType)
disk.diskUsages.getOrCreateDisk(types.ToDiskType(diskType)).maxVolumeCount = int64(maxVolumeCount)
dn.LinkChildNode(disk)
}
return dn
}
type RackInfo struct {
Id NodeId `json:"Id"`
DataNodes []DataNodeInfo `json:"DataNodes"`
}
func (r *Rack) ToInfo() (info RackInfo) {
info.Id = r.Id()
var dns []DataNodeInfo
for _, c := range r.Children() {
dn := c.(*DataNode)
dns = append(dns, dn.ToInfo())
}
slices.SortFunc(dns, func(a, b DataNodeInfo) int {
return strings.Compare(a.Url, b.Url)
})
info.DataNodes = dns
return
}
func (r *Rack) ToRackInfo() *master_pb.RackInfo {
m := &master_pb.RackInfo{
Id: string(r.Id()),
DiskInfos: r.diskUsages.ToDiskInfo(),
}
for _, c := range r.Children() {
dn := c.(*DataNode)
m.DataNodeInfos = append(m.DataNodeInfos, dn.ToDataNodeInfo())
}
return m
}
|