diff options
Diffstat (limited to 'weed/admin/topology/topology_management.go')
| -rw-r--r-- | weed/admin/topology/topology_management.go | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go new file mode 100644 index 000000000..e12839484 --- /dev/null +++ b/weed/admin/topology/topology_management.go @@ -0,0 +1,253 @@ +package topology + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +// UpdateTopology updates the topology information from master +func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + at.topologyInfo = topologyInfo + at.lastUpdated = time.Now() + + // Rebuild structured topology + at.nodes = make(map[string]*activeNode) + at.disks = make(map[string]*activeDisk) + + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + node := &activeNode{ + nodeID: nodeInfo.Id, + dataCenter: dc.Id, + rack: rack.Id, + nodeInfo: nodeInfo, + disks: make(map[uint32]*activeDisk), + } + + // Add disks for this node + for diskType, diskInfo := range nodeInfo.DiskInfos { + disk := &activeDisk{ + DiskInfo: &DiskInfo{ + NodeID: nodeInfo.Id, + DiskID: diskInfo.DiskId, + DiskType: diskType, + DataCenter: dc.Id, + Rack: rack.Id, + DiskInfo: diskInfo, + }, + } + + diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + node.disks[diskInfo.DiskId] = disk + at.disks[diskKey] = disk + } + + at.nodes[nodeInfo.Id] = node + } + } + } + + // Rebuild performance indexes for O(1) lookups + at.rebuildIndexes() + + // Reassign task states to updated topology + at.reassignTaskStates() + + glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries", + len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex)) + return nil +} + +// GetAvailableDisks returns disks that can accept new tasks of the given type +// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity +func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + var available []*DiskInfo + + for _, disk := range at.disks { + if disk.NodeID == excludeNodeID { + continue // Skip excluded node + } + + if at.isDiskAvailable(disk, taskType) { + // Create a copy with current load count and effective capacity + diskCopy := *disk.DiskInfo + diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) + available = append(available, &diskCopy) + } + } + + return available +} + +// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection) +func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool { + at.mutex.RLock() + defer at.mutex.RUnlock() + + for _, task := range at.recentTasks { + if task.VolumeID == volumeID && task.TaskType == taskType { + return true + } + } + + return false +} + +// GetAllNodes returns information about all nodes (public interface) +func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + result := make(map[string]*master_pb.DataNodeInfo) + for nodeID, node := range at.nodes { + result[nodeID] = node.nodeInfo + } + return result +} + +// GetTopologyInfo returns the current topology information (read-only access) +func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + return at.topologyInfo +} + +// GetNodeDisks returns all disks for a specific node +func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo { + at.mutex.RLock() + defer at.mutex.RUnlock() + + node, exists := at.nodes[nodeID] + if !exists { + return nil + } + + var disks []*DiskInfo + for _, disk := range node.disks { + diskCopy := *disk.DiskInfo + diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks) + disks = append(disks, &diskCopy) + } + + return disks +} + +// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups +func (at *ActiveTopology) rebuildIndexes() { + // Clear existing indexes + at.volumeIndex = make(map[uint32][]string) + at.ecShardIndex = make(map[uint32][]string) + + // Rebuild indexes from current topology + for _, dc := range at.topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + for _, diskInfo := range nodeInfo.DiskInfos { + diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId) + + // Index volumes + for _, volumeInfo := range diskInfo.VolumeInfos { + volumeID := volumeInfo.Id + at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey) + } + + // Index EC shards + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeID := ecShardInfo.Id + at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey) + } + } + } + } + } +} + +// GetVolumeLocations returns the disk locations for a volume using O(1) lookup +func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKeys, exists := at.volumeIndex[volumeID] + if !exists { + return []VolumeReplica{} + } + + var replicas []VolumeReplica + for _, diskKey := range diskKeys { + if disk, diskExists := at.disks[diskKey]; diskExists { + // Verify collection matches (since index doesn't include collection) + if at.volumeMatchesCollection(disk, volumeID, collection) { + replicas = append(replicas, VolumeReplica{ + ServerID: disk.NodeID, + DiskID: disk.DiskID, + }) + } + } + } + + return replicas +} + +// GetECShardLocations returns the disk locations for EC shards using O(1) lookup +func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica { + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKeys, exists := at.ecShardIndex[volumeID] + if !exists { + return []VolumeReplica{} + } + + var ecShards []VolumeReplica + for _, diskKey := range diskKeys { + if disk, diskExists := at.disks[diskKey]; diskExists { + // Verify collection matches (since index doesn't include collection) + if at.ecShardMatchesCollection(disk, volumeID, collection) { + ecShards = append(ecShards, VolumeReplica{ + ServerID: disk.NodeID, + DiskID: disk.DiskID, + }) + } + } + } + + return ecShards +} + +// volumeMatchesCollection checks if a volume on a disk matches the given collection +func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return false + } + + for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos { + if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { + return true + } + } + return false +} + +// ecShardMatchesCollection checks if EC shards on a disk match the given collection +func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool { + if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return false + } + + for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos { + if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection { + return true + } + } + return false +} |
