aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go7
-rw-r--r--weed/pb/master.proto2
-rw-r--r--weed/pb/master_pb/master.pb.go26
-rw-r--r--weed/pb/server_address.go12
-rw-r--r--weed/server/master_grpc_server.go4
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/volume_server.go4
-rw-r--r--weed/storage/store.go8
-rw-r--r--weed/storage/store_load_balancing_test.go2
-rw-r--r--weed/topology/data_node.go1
-rw-r--r--weed/topology/rack.go69
-rw-r--r--weed/topology/topology_test.go119
-rw-r--r--weed/util/network.go11
14 files changed, 240 insertions, 28 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 47df30fc2..d729502f0 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -133,6 +133,7 @@ func init() {
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
+ serverOptions.v.id = cmdServer.Flag.String("volume.id", "", "volume server id. If empty, default to ip:port")
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index e21437e9a..514553172 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -41,6 +41,7 @@ type VolumeServerOptions struct {
folderMaxLimits []int32
idxFolder *string
ip *string
+ id *string
publicUrl *string
bindIp *string
mastersString *string
@@ -78,6 +79,7 @@ func init() {
v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
+ v.id = cmdVolume.Flag.String("id", "", "volume server id. If empty, default to ip:port")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
v.mastersString = cmdVolume.Flag.String("master", "localhost:9333", "comma-separated master servers")
@@ -253,8 +255,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind = storage.NeedleMapLevelDbLarge
}
+ // Determine volume server ID: if not specified, use ip:port
+ volumeServerId := util.GetVolumeServerId(*v.id, *v.ip, *v.port)
+
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
- *v.ip, *v.port, *v.portGrpc, *v.publicUrl,
+ *v.ip, *v.port, *v.portGrpc, *v.publicUrl, volumeServerId,
v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes,
*v.idxFolder,
volumeNeedleMapKind,
diff --git a/weed/pb/master.proto b/weed/pb/master.proto
index f8049c466..afbf31de9 100644
--- a/weed/pb/master.proto
+++ b/weed/pb/master.proto
@@ -81,6 +81,7 @@ message Heartbeat {
map<string, uint32> max_volume_counts = 4;
uint32 grpc_port = 20;
repeated string location_uuids = 21;
+ string id = 22; // volume server id, independent of ip:port for stable identification
}
message HeartbeatResponse {
@@ -289,6 +290,7 @@ message DataNodeInfo {
string id = 1;
map<string, DiskInfo> diskInfos = 2;
uint32 grpc_port = 3;
+ string address = 4; // ip:port for connecting to the volume server
}
message RackInfo {
string id = 1;
diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go
index 19df43d71..41d46fad1 100644
--- a/weed/pb/master_pb/master.pb.go
+++ b/weed/pb/master_pb/master.pb.go
@@ -44,6 +44,7 @@ type Heartbeat struct {
MaxVolumeCounts map[string]uint32 `protobuf:"bytes,4,rep,name=max_volume_counts,json=maxVolumeCounts,proto3" json:"max_volume_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
GrpcPort uint32 `protobuf:"varint,20,opt,name=grpc_port,json=grpcPort,proto3" json:"grpc_port,omitempty"`
LocationUuids []string `protobuf:"bytes,21,rep,name=location_uuids,json=locationUuids,proto3" json:"location_uuids,omitempty"`
+ Id string `protobuf:"bytes,22,opt,name=id,proto3" json:"id,omitempty"` // volume server id, independent of ip:port for stable identification
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -204,6 +205,13 @@ func (x *Heartbeat) GetLocationUuids() []string {
return nil
}
+func (x *Heartbeat) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
type HeartbeatResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volume_size_limit,json=volumeSizeLimit,proto3" json:"volume_size_limit,omitempty"`
@@ -2039,6 +2047,7 @@ type DataNodeInfo struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
DiskInfos map[string]*DiskInfo `protobuf:"bytes,2,rep,name=diskInfos,proto3" json:"diskInfos,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
GrpcPort uint32 `protobuf:"varint,3,opt,name=grpc_port,json=grpcPort,proto3" json:"grpc_port,omitempty"`
+ Address string `protobuf:"bytes,4,opt,name=address,proto3" json:"address,omitempty"` // ip:port for connecting to the volume server
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -2094,6 +2103,13 @@ func (x *DataNodeInfo) GetGrpcPort() uint32 {
return 0
}
+func (x *DataNodeInfo) GetAddress() string {
+ if x != nil {
+ return x.Address
+ }
+ return ""
+}
+
type RackInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
@@ -4038,7 +4054,7 @@ var File_master_proto protoreflect.FileDescriptor
const file_master_proto_rawDesc = "" +
"\n" +
- "\fmaster.proto\x12\tmaster_pb\"\xc0\a\n" +
+ "\fmaster.proto\x12\tmaster_pb\"\xd0\a\n" +
"\tHeartbeat\x12\x0e\n" +
"\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" +
"\x04port\x18\x02 \x01(\rR\x04port\x12\x1d\n" +
@@ -4063,7 +4079,8 @@ const file_master_proto_rawDesc = "" +
"\x10has_no_ec_shards\x18\x13 \x01(\bR\rhasNoEcShards\x12U\n" +
"\x11max_volume_counts\x18\x04 \x03(\v2).master_pb.Heartbeat.MaxVolumeCountsEntryR\x0fmaxVolumeCounts\x12\x1b\n" +
"\tgrpc_port\x18\x14 \x01(\rR\bgrpcPort\x12%\n" +
- "\x0elocation_uuids\x18\x15 \x03(\tR\rlocationUuids\x1aB\n" +
+ "\x0elocation_uuids\x18\x15 \x03(\tR\rlocationUuids\x12\x0e\n" +
+ "\x02id\x18\x16 \x01(\tR\x02id\x1aB\n" +
"\x14MaxVolumeCountsEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\rR\x05value:\x028\x01\"\xcd\x02\n" +
@@ -4254,11 +4271,12 @@ const file_master_proto_rawDesc = "" +
"\fvolume_infos\x18\x06 \x03(\v2#.master_pb.VolumeInformationMessageR\vvolumeInfos\x12P\n" +
"\x0eec_shard_infos\x18\a \x03(\v2*.master_pb.VolumeEcShardInformationMessageR\fecShardInfos\x12.\n" +
"\x13remote_volume_count\x18\b \x01(\x03R\x11remoteVolumeCount\x12\x17\n" +
- "\adisk_id\x18\t \x01(\rR\x06diskId\"\xd4\x01\n" +
+ "\adisk_id\x18\t \x01(\rR\x06diskId\"\xee\x01\n" +
"\fDataNodeInfo\x12\x0e\n" +
"\x02id\x18\x01 \x01(\tR\x02id\x12D\n" +
"\tdiskInfos\x18\x02 \x03(\v2&.master_pb.DataNodeInfo.DiskInfosEntryR\tdiskInfos\x12\x1b\n" +
- "\tgrpc_port\x18\x03 \x01(\rR\bgrpcPort\x1aQ\n" +
+ "\tgrpc_port\x18\x03 \x01(\rR\bgrpcPort\x12\x18\n" +
+ "\aaddress\x18\x04 \x01(\tR\aaddress\x1aQ\n" +
"\x0eDiskInfosEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12)\n" +
"\x05value\x18\x02 \x01(\v2\x13.master_pb.DiskInfoR\x05value:\x028\x01\"\xf0\x01\n" +
diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go
index a0aa79ae4..943b85519 100644
--- a/weed/pb/server_address.go
+++ b/weed/pb/server_address.go
@@ -2,11 +2,12 @@ package pb
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
"net"
"strconv"
"strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
type ServerAddress string
@@ -32,7 +33,12 @@ func NewServerAddressWithGrpcPort(address string, grpcPort int) ServerAddress {
}
func NewServerAddressFromDataNode(dn *master_pb.DataNodeInfo) ServerAddress {
- return NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort))
+ // Use Address field if available (new behavior), fall back to Id for backward compatibility
+ addr := dn.Address
+ if addr == "" {
+ addr = dn.Id // backward compatibility: old nodes use ip:port as id
+ }
+ return NewServerAddressWithGrpcPort(addr, int(dn.GrpcPort))
}
func NewServerAddressFromLocation(dn *master_pb.Location) ServerAddress {
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index dcf279e1d..e053d9ea7 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -137,8 +137,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.Id, heartbeat.MaxVolumeCounts)
+ glog.V(0).Infof("added volume server %d: %v (id=%s, ip=%v:%d) %v", dn.Counter, dn.Id(), heartbeat.Id, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
uuidlist, err := ms.RegisterUuids(heartbeat)
if err != nil {
if stream_err := stream.Send(&master_pb.HeartbeatResponse{
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index a7ef8e7e9..d00cb5df4 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -253,7 +253,7 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
var locations []*master_pb.Location
for _, dn := range shardLocations {
locations = append(locations, &master_pb.Location{
- Url: string(dn.Id()),
+ Url: dn.Url(),
PublicUrl: dn.PublicUrl,
DataCenter: dn.GetDataCenterId(),
})
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 4f8a7fb0d..65909996a 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -55,7 +55,7 @@ type VolumeServer struct {
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
- port int, grpcPort int, publicUrl string,
+ port int, grpcPort int, publicUrl string, id string,
folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
@@ -114,7 +114,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
- vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
+ vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, id, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
diff --git a/weed/storage/store.go b/weed/storage/store.go
index cc07f8702..30f33d6d9 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -63,6 +63,7 @@ type Store struct {
Port int
GrpcPort int
PublicUrl string
+ Id string // volume server id, independent of ip:port for stable identification
Locations []*DiskLocation
dataCenter string // optional information, overwriting master setting if exists
rack string // optional information, overwriting master setting if exists
@@ -76,13 +77,13 @@ type Store struct {
}
func (s *Store) String() (str string) {
- str = fmt.Sprintf("Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
+ str = fmt.Sprintf("Id:%s, Ip:%s, Port:%d, GrpcPort:%d PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Id, s.Ip, s.Port, s.GrpcPort, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
return
}
-func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, dirnames []string, maxVolumeCounts []int32,
+func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, id string, dirnames []string, maxVolumeCounts []int32,
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) {
- s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
+ s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
var wg sync.WaitGroup
@@ -414,6 +415,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
Port: uint32(s.Port),
GrpcPort: uint32(s.GrpcPort),
PublicUrl: s.PublicUrl,
+ Id: s.Id,
MaxVolumeCounts: maxVolumeCounts,
MaxFileKey: NeedleIdToUint64(maxFileKey),
DataCenter: s.dataCenter,
diff --git a/weed/storage/store_load_balancing_test.go b/weed/storage/store_load_balancing_test.go
index 15e709d53..35475a6ae 100644
--- a/weed/storage/store_load_balancing_test.go
+++ b/weed/storage/store_load_balancing_test.go
@@ -31,7 +31,7 @@ func newTestStore(t *testing.T, numDirs int) *Store {
diskTypes = append(diskTypes, types.HardDriveType)
}
- store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080",
+ store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "",
dirs, maxCounts, minFreeSpaces, "", NeedleMapInMemory, diskTypes, 3)
// Consume channel messages to prevent blocking
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 4f2dbe464..07e00ac0a 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -269,6 +269,7 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
Id: string(dn.Id()),
DiskInfos: make(map[string]*master_pb.DiskInfo),
GrpcPort: uint32(dn.GrpcPort),
+ Address: dn.Url(), // ip:port for connecting to the volume server
}
for _, c := range dn.Children() {
disk := c.(*Disk)
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index f526cd84d..1e5c8b632 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -5,6 +5,7 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -34,17 +35,73 @@ func (r *Rack) FindDataNode(ip string, port int) *DataNode {
}
return nil
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, maxVolumeCounts map[string]uint32) *DataNode {
+
+// FindDataNodeById finds a DataNode by its ID using O(1) map lookup
+func (r *Rack) FindDataNodeById(id string) *DataNode {
+ r.RLock()
+ defer r.RUnlock()
+ if c, ok := r.children[NodeId(id)]; ok {
+ return c.(*DataNode)
+ }
+ return nil
+}
+
+func (r *Rack) GetOrCreateDataNode(ip string, port int, grpcPort int, publicUrl string, id string, maxVolumeCounts map[string]uint32) *DataNode {
r.Lock()
defer r.Unlock()
- for _, c := range r.children {
+
+ // Normalize the id parameter (trim whitespace)
+ id = strings.TrimSpace(id)
+
+ // Determine the node ID: use provided id, or fall back to ip:port for backward compatibility
+ nodeId := util.GetVolumeServerId(id, ip, port)
+
+ // First, try to find by node ID using O(1) map lookup (stable identity)
+ if c, ok := r.children[NodeId(nodeId)]; ok {
dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- return dn
+ // Log if IP or Port changed (e.g., pod rescheduled in K8s)
+ if dn.Ip != ip || dn.Port != port {
+ glog.V(0).Infof("DataNode %s address changed from %s:%d to %s:%d", nodeId, dn.Ip, dn.Port, ip, port)
}
+ // Update the IP/Port in case they changed
+ dn.Ip = ip
+ dn.Port = port
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
}
- dn := NewDataNode(util.JoinHostPort(ip, port))
+
+ // For backward compatibility: if explicit id was provided, also check by ip:port
+ // to handle transition from old (ip:port) to new (explicit id) behavior
+ ipPortId := util.JoinHostPort(ip, port)
+ if nodeId != ipPortId {
+ for oldId, c := range r.children {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ // Only transition if the oldId exactly matches ip:port (legacy identification).
+ // If oldId is different, this is a node with an explicit id that happens to
+ // reuse the same ip:port - don't incorrectly merge them.
+ if string(oldId) != ipPortId {
+ glog.Warningf("Volume server with id %s has ip:port %s which is used by node %s", nodeId, ipPortId, oldId)
+ continue
+ }
+ // Found a legacy node identified by ip:port, transition it to use the new explicit id
+ glog.V(0).Infof("Volume server %s transitioning id from %s to %s", dn.Url(), oldId, nodeId)
+ // Re-key the node in the children map with the new id
+ delete(r.children, oldId)
+ dn.id = NodeId(nodeId)
+ r.children[NodeId(nodeId)] = dn
+ // Update connection info in case they changed
+ dn.GrpcPort = grpcPort
+ dn.PublicUrl = publicUrl
+ dn.LastSeen = time.Now().Unix()
+ return dn
+ }
+ }
+ }
+
+ dn := NewDataNode(nodeId)
dn.Ip = ip
dn.Port = port
dn.GrpcPort = grpcPort
diff --git a/weed/topology/topology_test.go b/weed/topology/topology_test.go
index 8515d2f81..e5a8969fc 100644
--- a/weed/topology/topology_test.go
+++ b/weed/topology/topology_test.go
@@ -34,7 +34,7 @@ func TestHandlingVolumeServerHeartbeat(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
{
volumeCount := 7
@@ -180,7 +180,7 @@ func TestAddRemoveVolume(t *testing.T) {
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
maxVolumeCounts["ssd"] = 12
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
v := storage.VolumeInfo{
Id: needle.VolumeId(1),
@@ -218,7 +218,7 @@ func TestVolumeReadOnlyStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable volume
v := storage.VolumeInfo{
@@ -267,7 +267,7 @@ func TestVolumeReadOnlyAndRemoteStatusChange(t *testing.T) {
rack := dc.GetOrCreateRack("rack1")
maxVolumeCounts := make(map[string]uint32)
maxVolumeCounts[""] = 25
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", maxVolumeCounts)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", maxVolumeCounts)
// Create a writable, local volume
v := storage.VolumeInfo{
@@ -331,7 +331,7 @@ func TestListCollections(t *testing.T) {
topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
dc := topo.GetOrCreateDataCenter("dc1")
rack := dc.GetOrCreateRack("rack1")
- dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", nil)
+ dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, 0, "127.0.0.1", "", nil)
topo.RegisterVolumeLayout(storage.VolumeInfo{
Id: needle.VolumeId(1111),
@@ -396,3 +396,112 @@ func TestListCollections(t *testing.T) {
})
}
}
+
+func TestDataNodeIdBasedIdentification(t *testing.T) {
+ topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
+ dc := topo.GetOrCreateDataCenter("dc1")
+ rack := dc.GetOrCreateRack("rack1")
+
+ maxVolumeCounts := make(map[string]uint32)
+ maxVolumeCounts[""] = 10
+
+ // Test 1: Create a DataNode with explicit id
+ dn1 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-1", maxVolumeCounts)
+ if string(dn1.Id()) != "node-1" {
+ t.Errorf("expected node id 'node-1', got '%s'", dn1.Id())
+ }
+ if dn1.Ip != "10.0.0.1" {
+ t.Errorf("expected ip '10.0.0.1', got '%s'", dn1.Ip)
+ }
+
+ // Test 2: Same id with different IP should return the same DataNode (K8s pod reschedule scenario)
+ dn2 := rack.GetOrCreateDataNode("10.0.0.2", 8080, 18080, "10.0.0.2:8080", "node-1", maxVolumeCounts)
+ if dn1 != dn2 {
+ t.Errorf("expected same DataNode for same id, got different nodes")
+ }
+ // IP should be updated to the new value
+ if dn2.Ip != "10.0.0.2" {
+ t.Errorf("expected ip to be updated to '10.0.0.2', got '%s'", dn2.Ip)
+ }
+ if dn2.PublicUrl != "10.0.0.2:8080" {
+ t.Errorf("expected publicUrl to be updated to '10.0.0.2:8080', got '%s'", dn2.PublicUrl)
+ }
+
+ // Test 3: Different id should create a new DataNode
+ dn3 := rack.GetOrCreateDataNode("10.0.0.3", 8080, 18080, "10.0.0.3:8080", "node-2", maxVolumeCounts)
+ if string(dn3.Id()) != "node-2" {
+ t.Errorf("expected node id 'node-2', got '%s'", dn3.Id())
+ }
+ if dn1 == dn3 {
+ t.Errorf("expected different DataNode for different id")
+ }
+
+ // Test 4: Empty id should fall back to ip:port (backward compatibility)
+ dn4 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
+ if string(dn4.Id()) != "10.0.0.4:8080" {
+ t.Errorf("expected node id '10.0.0.4:8080' for empty id, got '%s'", dn4.Id())
+ }
+
+ // Test 5: Same ip:port with empty id should return the same DataNode
+ dn5 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "", maxVolumeCounts)
+ if dn4 != dn5 {
+ t.Errorf("expected same DataNode for same ip:port with empty id")
+ }
+
+ // Verify we have 3 unique DataNodes total:
+ // - node-1 (dn1/dn2 share the same id)
+ // - node-2 (dn3)
+ // - 10.0.0.4:8080 (dn4/dn5 share the same ip:port)
+ children := rack.Children()
+ if len(children) != 3 {
+ t.Errorf("expected 3 DataNodes, got %d", len(children))
+ }
+
+ // Test 6: Transition from ip:port to explicit id
+ // First, the node exists with ip:port as id (dn4/dn5)
+ // Now the same volume server starts sending an explicit id
+ dn6 := rack.GetOrCreateDataNode("10.0.0.4", 8080, 18080, "10.0.0.4:8080", "node-4-explicit", maxVolumeCounts)
+ // Should return the same DataNode instance
+ if dn6 != dn4 {
+ t.Errorf("expected same DataNode instance during transition")
+ }
+ // But the id should now be updated to the explicit id
+ if string(dn6.Id()) != "node-4-explicit" {
+ t.Errorf("expected node id to transition to 'node-4-explicit', got '%s'", dn6.Id())
+ }
+ // The node should be re-keyed in the children map
+ if rack.FindDataNodeById("node-4-explicit") != dn6 {
+ t.Errorf("expected to find DataNode by new explicit id")
+ }
+ // Old ip:port key should no longer work
+ if rack.FindDataNodeById("10.0.0.4:8080") != nil {
+ t.Errorf("expected old ip:port id to be removed from children map")
+ }
+
+ // Still 3 unique DataNodes (node-1, node-2, node-4-explicit)
+ children = rack.Children()
+ if len(children) != 3 {
+ t.Errorf("expected 3 DataNodes after transition, got %d", len(children))
+ }
+
+ // Test 7: Prevent incorrect transition when a new node reuses ip:port of a node with explicit id
+ // Scenario: node-1 runs at 10.0.0.1:8080, dies, new node-99 starts at same ip:port
+ // The transition should NOT happen because node-1 already has an explicit id
+ dn7 := rack.GetOrCreateDataNode("10.0.0.1", 8080, 18080, "10.0.0.1:8080", "node-99", maxVolumeCounts)
+ // Should create a NEW DataNode, not reuse node-1
+ if dn7 == dn1 {
+ t.Errorf("expected new DataNode for node-99, got reused node-1")
+ }
+ if string(dn7.Id()) != "node-99" {
+ t.Errorf("expected node id 'node-99', got '%s'", dn7.Id())
+ }
+ // node-1 should still exist with its original id
+ if rack.FindDataNodeById("node-1") == nil {
+ t.Errorf("node-1 should still exist")
+ }
+ // Now we have 4 DataNodes
+ children = rack.Children()
+ if len(children) != 4 {
+ t.Errorf("expected 4 DataNodes, got %d", len(children))
+ }
+}
diff --git a/weed/util/network.go b/weed/util/network.go
index 328808dbc..f7dbeebb7 100644
--- a/weed/util/network.go
+++ b/weed/util/network.go
@@ -64,3 +64,14 @@ func JoinHostPort(host string, port int) string {
}
return net.JoinHostPort(host, portStr)
}
+
+// GetVolumeServerId returns the volume server ID.
+// If id is provided (non-empty after trimming), use it as the identifier.
+// Otherwise, fall back to ip:port for backward compatibility.
+func GetVolumeServerId(id, ip string, port int) string {
+ volumeServerId := strings.TrimSpace(id)
+ if volumeServerId == "" {
+ volumeServerId = JoinHostPort(ip, port)
+ }
+ return volumeServerId
+}