aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/topology_management.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/topology_management.go')
-rw-r--r--weed/admin/topology/topology_management.go253
1 files changed, 253 insertions, 0 deletions
diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go
new file mode 100644
index 000000000..e12839484
--- /dev/null
+++ b/weed/admin/topology/topology_management.go
@@ -0,0 +1,253 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// UpdateTopology updates the topology information from master
+func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ at.topologyInfo = topologyInfo
+ at.lastUpdated = time.Now()
+
+ // Rebuild structured topology
+ at.nodes = make(map[string]*activeNode)
+ at.disks = make(map[string]*activeDisk)
+
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ node := &activeNode{
+ nodeID: nodeInfo.Id,
+ dataCenter: dc.Id,
+ rack: rack.Id,
+ nodeInfo: nodeInfo,
+ disks: make(map[uint32]*activeDisk),
+ }
+
+ // Add disks for this node
+ for diskType, diskInfo := range nodeInfo.DiskInfos {
+ disk := &activeDisk{
+ DiskInfo: &DiskInfo{
+ NodeID: nodeInfo.Id,
+ DiskID: diskInfo.DiskId,
+ DiskType: diskType,
+ DataCenter: dc.Id,
+ Rack: rack.Id,
+ DiskInfo: diskInfo,
+ },
+ }
+
+ diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
+ node.disks[diskInfo.DiskId] = disk
+ at.disks[diskKey] = disk
+ }
+
+ at.nodes[nodeInfo.Id] = node
+ }
+ }
+ }
+
+ // Rebuild performance indexes for O(1) lookups
+ at.rebuildIndexes()
+
+ // Reassign task states to updated topology
+ at.reassignTaskStates()
+
+ glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
+ len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
+ return nil
+}
+
+// GetAvailableDisks returns disks that can accept new tasks of the given type
+// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
+func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ var available []*DiskInfo
+
+ for _, disk := range at.disks {
+ if disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ if at.isDiskAvailable(disk, taskType) {
+ // Create a copy with current load count and effective capacity
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ available = append(available, &diskCopy)
+ }
+ }
+
+ return available
+}
+
+// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
+func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ for _, task := range at.recentTasks {
+ if task.VolumeID == volumeID && task.TaskType == taskType {
+ return true
+ }
+ }
+
+ return false
+}
+
+// GetAllNodes returns information about all nodes (public interface)
+func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ result := make(map[string]*master_pb.DataNodeInfo)
+ for nodeID, node := range at.nodes {
+ result[nodeID] = node.nodeInfo
+ }
+ return result
+}
+
+// GetTopologyInfo returns the current topology information (read-only access)
+func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+ return at.topologyInfo
+}
+
+// GetNodeDisks returns all disks for a specific node
+func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ node, exists := at.nodes[nodeID]
+ if !exists {
+ return nil
+ }
+
+ var disks []*DiskInfo
+ for _, disk := range node.disks {
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ disks = append(disks, &diskCopy)
+ }
+
+ return disks
+}
+
+// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
+func (at *ActiveTopology) rebuildIndexes() {
+ // Clear existing indexes
+ at.volumeIndex = make(map[uint32][]string)
+ at.ecShardIndex = make(map[uint32][]string)
+
+ // Rebuild indexes from current topology
+ for _, dc := range at.topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ for _, diskInfo := range nodeInfo.DiskInfos {
+ diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
+
+ // Index volumes
+ for _, volumeInfo := range diskInfo.VolumeInfos {
+ volumeID := volumeInfo.Id
+ at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
+ }
+
+ // Index EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ volumeID := ecShardInfo.Id
+ at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
+ }
+ }
+ }
+ }
+ }
+}
+
+// GetVolumeLocations returns the disk locations for a volume using O(1) lookup
+func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKeys, exists := at.volumeIndex[volumeID]
+ if !exists {
+ return []VolumeReplica{}
+ }
+
+ var replicas []VolumeReplica
+ for _, diskKey := range diskKeys {
+ if disk, diskExists := at.disks[diskKey]; diskExists {
+ // Verify collection matches (since index doesn't include collection)
+ if at.volumeMatchesCollection(disk, volumeID, collection) {
+ replicas = append(replicas, VolumeReplica{
+ ServerID: disk.NodeID,
+ DiskID: disk.DiskID,
+ })
+ }
+ }
+ }
+
+ return replicas
+}
+
+// GetECShardLocations returns the disk locations for EC shards using O(1) lookup
+func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKeys, exists := at.ecShardIndex[volumeID]
+ if !exists {
+ return []VolumeReplica{}
+ }
+
+ var ecShards []VolumeReplica
+ for _, diskKey := range diskKeys {
+ if disk, diskExists := at.disks[diskKey]; diskExists {
+ // Verify collection matches (since index doesn't include collection)
+ if at.ecShardMatchesCollection(disk, volumeID, collection) {
+ ecShards = append(ecShards, VolumeReplica{
+ ServerID: disk.NodeID,
+ DiskID: disk.DiskID,
+ })
+ }
+ }
+ }
+
+ return ecShards
+}
+
+// volumeMatchesCollection checks if a volume on a disk matches the given collection
+func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return false
+ }
+
+ for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
+ if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
+ return true
+ }
+ }
+ return false
+}
+
+// ecShardMatchesCollection checks if EC shards on a disk match the given collection
+func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return false
+ }
+
+ for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
+ if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
+ return true
+ }
+ }
+ return false
+}