diff options
Diffstat (limited to 'weed/admin/dash/ec_shard_management.go')
| -rw-r--r-- | weed/admin/dash/ec_shard_management.go | 734 |
1 files changed, 734 insertions, 0 deletions
diff --git a/weed/admin/dash/ec_shard_management.go b/weed/admin/dash/ec_shard_management.go new file mode 100644 index 000000000..272890cf0 --- /dev/null +++ b/weed/admin/dash/ec_shard_management.go @@ -0,0 +1,734 @@ +package dash + +import ( + "context" + "fmt" + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// GetClusterEcShards retrieves cluster EC shards data with pagination, sorting, and filtering +func (s *AdminServer) GetClusterEcShards(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcShardsData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "volume_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + var ecShards []EcShardWithInfo + volumeShardsMap := make(map[uint32]map[int]bool) // volumeId -> set of shards present + volumesWithAllShards := 0 + volumesWithMissingShards := 0 + + // Get detailed EC shard information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeId := ecShardInfo.Id + + // Initialize volume shards map if needed + if volumeShardsMap[volumeId] == nil { + volumeShardsMap[volumeId] = make(map[int]bool) + } + + // Create individual shard entries for each shard this server has + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + // Mark this shard as present for this volume + volumeShardsMap[volumeId][shardId] = true + + ecShard := EcShardWithInfo{ + VolumeID: volumeId, + ShardID: uint32(shardId), + Collection: ecShardInfo.Collection, + Size: 0, // EC shards don't have individual size in the API response + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + DiskType: diskInfo.Type, + ModifiedTime: 0, // Not available in current API + EcIndexBits: ecShardInfo.EcIndexBits, + ShardCount: getShardCount(ecShardInfo.EcIndexBits), + } + ecShards = append(ecShards, ecShard) + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Calculate volume-level completeness (across all servers) + volumeCompleteness := make(map[uint32]bool) + volumeMissingShards := make(map[uint32][]int) + + for volumeId, shardsPresent := range volumeShardsMap { + var missingShards []int + shardCount := len(shardsPresent) + + // Find which shards are missing for this volume across ALL servers + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if !shardsPresent[shardId] { + missingShards = append(missingShards, shardId) + } + } + + isComplete := (shardCount == erasure_coding.TotalShardsCount) + volumeCompleteness[volumeId] = isComplete + volumeMissingShards[volumeId] = missingShards + + if isComplete { + volumesWithAllShards++ + } else { + volumesWithMissingShards++ + } + } + + // Update completeness info for each shard based on volume-level completeness + for i := range ecShards { + volumeId := ecShards[i].VolumeID + ecShards[i].IsComplete = volumeCompleteness[volumeId] + ecShards[i].MissingShards = volumeMissingShards[volumeId] + } + + // Filter by collection if specified + if collection != "" { + var filteredShards []EcShardWithInfo + for _, shard := range ecShards { + if shard.Collection == collection { + filteredShards = append(filteredShards, shard) + } + } + ecShards = filteredShards + } + + // Sort the results + sortEcShards(ecShards, sortBy, sortOrder) + + // Calculate statistics for conditional display + dataCenters := make(map[string]bool) + racks := make(map[string]bool) + collections := make(map[string]bool) + + for _, shard := range ecShards { + dataCenters[shard.DataCenter] = true + racks[shard.Rack] = true + if shard.Collection != "" { + collections[shard.Collection] = true + } + } + + // Pagination + totalShards := len(ecShards) + totalPages := (totalShards + pageSize - 1) / pageSize + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if endIndex > totalShards { + endIndex = totalShards + } + + if startIndex >= totalShards { + startIndex = 0 + endIndex = 0 + } + + paginatedShards := ecShards[startIndex:endIndex] + + // Build response + data := &ClusterEcShardsData{ + EcShards: paginatedShards, + TotalShards: totalShards, + TotalVolumes: len(volumeShardsMap), + LastUpdated: time.Now(), + + // Pagination + CurrentPage: page, + TotalPages: totalPages, + PageSize: pageSize, + + // Sorting + SortBy: sortBy, + SortOrder: sortOrder, + + // Statistics + DataCenterCount: len(dataCenters), + RackCount: len(racks), + CollectionCount: len(collections), + + // Conditional display flags + ShowDataCenterColumn: len(dataCenters) > 1, + ShowRackColumn: len(racks) > 1, + ShowCollectionColumn: len(collections) > 1 || collection != "", + + // Filtering + FilterCollection: collection, + + // EC specific statistics + ShardsPerVolume: make(map[uint32]int), // This will be recalculated below + VolumesWithAllShards: volumesWithAllShards, + VolumesWithMissingShards: volumesWithMissingShards, + } + + // Recalculate ShardsPerVolume for the response + for volumeId, shardsPresent := range volumeShardsMap { + data.ShardsPerVolume[volumeId] = len(shardsPresent) + } + + // Set single values when only one exists + if len(dataCenters) == 1 { + for dc := range dataCenters { + data.SingleDataCenter = dc + break + } + } + if len(racks) == 1 { + for rack := range racks { + data.SingleRack = rack + break + } + } + if len(collections) == 1 { + for col := range collections { + data.SingleCollection = col + break + } + } + + return data, nil +} + +// GetClusterEcVolumes retrieves cluster EC volumes data grouped by volume ID with shard locations +func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string, sortOrder string, collection string) (*ClusterEcVolumesData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "volume_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + volumeData := make(map[uint32]*EcVolumeWithShards) + totalShards := 0 + + // Get detailed EC shard information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeId := ecShardInfo.Id + + // Initialize volume data if needed + if volumeData[volumeId] == nil { + volumeData[volumeId] = &EcVolumeWithShards{ + VolumeID: volumeId, + Collection: ecShardInfo.Collection, + TotalShards: 0, + IsComplete: false, + MissingShards: []int{}, + ShardLocations: make(map[int]string), + ShardSizes: make(map[int]int64), + DataCenters: []string{}, + Servers: []string{}, + Racks: []string{}, + } + } + + volume := volumeData[volumeId] + + // Track data centers and servers + dcExists := false + for _, existingDc := range volume.DataCenters { + if existingDc == dc.Id { + dcExists = true + break + } + } + if !dcExists { + volume.DataCenters = append(volume.DataCenters, dc.Id) + } + + serverExists := false + for _, existingServer := range volume.Servers { + if existingServer == node.Id { + serverExists = true + break + } + } + if !serverExists { + volume.Servers = append(volume.Servers, node.Id) + } + + // Track racks + rackExists := false + for _, existingRack := range volume.Racks { + if existingRack == rack.Id { + rackExists = true + break + } + } + if !rackExists { + volume.Racks = append(volume.Racks, rack.Id) + } + + // Process each shard this server has for this volume + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + // Record shard location + volume.ShardLocations[shardId] = node.Id + totalShards++ + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Collect shard size information from volume servers + for volumeId, volume := range volumeData { + // Group servers by volume to minimize gRPC calls + serverHasVolume := make(map[string]bool) + for _, server := range volume.Servers { + serverHasVolume[server] = true + } + + // Query each server for shard sizes + for server := range serverHasVolume { + err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeId, + }) + if err != nil { + glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err) + return nil // Continue with other servers, don't fail the entire request + } + + // Update shard sizes + for _, shardInfo := range resp.EcShardInfos { + volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size + } + + return nil + }) + if err != nil { + glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err) + } + } + } + + // Calculate completeness for each volume + completeVolumes := 0 + incompleteVolumes := 0 + + for _, volume := range volumeData { + volume.TotalShards = len(volume.ShardLocations) + + // Find missing shards + var missingShards []int + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if _, exists := volume.ShardLocations[shardId]; !exists { + missingShards = append(missingShards, shardId) + } + } + + volume.MissingShards = missingShards + volume.IsComplete = (len(missingShards) == 0) + + if volume.IsComplete { + completeVolumes++ + } else { + incompleteVolumes++ + } + } + + // Convert map to slice + var ecVolumes []EcVolumeWithShards + for _, volume := range volumeData { + // Filter by collection if specified + if collection == "" || volume.Collection == collection { + ecVolumes = append(ecVolumes, *volume) + } + } + + // Sort the results + sortEcVolumes(ecVolumes, sortBy, sortOrder) + + // Calculate statistics for conditional display + dataCenters := make(map[string]bool) + collections := make(map[string]bool) + + for _, volume := range ecVolumes { + for _, dc := range volume.DataCenters { + dataCenters[dc] = true + } + if volume.Collection != "" { + collections[volume.Collection] = true + } + } + + // Pagination + totalVolumes := len(ecVolumes) + totalPages := (totalVolumes + pageSize - 1) / pageSize + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if endIndex > totalVolumes { + endIndex = totalVolumes + } + + if startIndex >= totalVolumes { + startIndex = 0 + endIndex = 0 + } + + paginatedVolumes := ecVolumes[startIndex:endIndex] + + // Build response + data := &ClusterEcVolumesData{ + EcVolumes: paginatedVolumes, + TotalVolumes: totalVolumes, + LastUpdated: time.Now(), + + // Pagination + Page: page, + PageSize: pageSize, + TotalPages: totalPages, + + // Sorting + SortBy: sortBy, + SortOrder: sortOrder, + + // Filtering + Collection: collection, + + // Conditional display flags + ShowDataCenterColumn: len(dataCenters) > 1, + ShowRackColumn: false, // We don't track racks in this view for simplicity + ShowCollectionColumn: len(collections) > 1 || collection != "", + + // Statistics + CompleteVolumes: completeVolumes, + IncompleteVolumes: incompleteVolumes, + TotalShards: totalShards, + } + + return data, nil +} + +// sortEcVolumes sorts EC volumes based on the specified field and order +func sortEcVolumes(volumes []EcVolumeWithShards, sortBy string, sortOrder string) { + sort.Slice(volumes, func(i, j int) bool { + var less bool + switch sortBy { + case "volume_id": + less = volumes[i].VolumeID < volumes[j].VolumeID + case "collection": + if volumes[i].Collection == volumes[j].Collection { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].Collection < volumes[j].Collection + } + case "total_shards": + if volumes[i].TotalShards == volumes[j].TotalShards { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].TotalShards < volumes[j].TotalShards + } + case "completeness": + // Complete volumes first, then by volume ID + if volumes[i].IsComplete == volumes[j].IsComplete { + less = volumes[i].VolumeID < volumes[j].VolumeID + } else { + less = volumes[i].IsComplete && !volumes[j].IsComplete + } + default: + less = volumes[i].VolumeID < volumes[j].VolumeID + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// getShardCount returns the number of shards represented by the bitmap +func getShardCount(ecIndexBits uint32) int { + count := 0 + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if (ecIndexBits & (1 << uint(i))) != 0 { + count++ + } + } + return count +} + +// getMissingShards returns a slice of missing shard IDs for a volume +func getMissingShards(ecIndexBits uint32) []int { + var missing []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if (ecIndexBits & (1 << uint(i))) == 0 { + missing = append(missing, i) + } + } + return missing +} + +// sortEcShards sorts EC shards based on the specified field and order +func sortEcShards(shards []EcShardWithInfo, sortBy string, sortOrder string) { + sort.Slice(shards, func(i, j int) bool { + var less bool + switch sortBy { + case "shard_id": + less = shards[i].ShardID < shards[j].ShardID + case "server": + if shards[i].Server == shards[j].Server { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].Server < shards[j].Server + } + case "data_center": + if shards[i].DataCenter == shards[j].DataCenter { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].DataCenter < shards[j].DataCenter + } + case "rack": + if shards[i].Rack == shards[j].Rack { + less = shards[i].ShardID < shards[j].ShardID // Secondary sort by shard ID + } else { + less = shards[i].Rack < shards[j].Rack + } + default: + less = shards[i].ShardID < shards[j].ShardID + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// GetEcVolumeDetails retrieves detailed information about a specific EC volume +func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrder string) (*EcVolumeDetailsData, error) { + // Set defaults + if sortBy == "" { + sortBy = "shard_id" + } + if sortOrder == "" { + sortOrder = "asc" + } + + var shards []EcShardWithInfo + var collection string + dataCenters := make(map[string]bool) + servers := make(map[string]bool) + + // Get detailed EC shard information for the specific volume via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + // Process EC shard information for this specific volume + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Id == volumeID { + collection = ecShardInfo.Collection + dataCenters[dc.Id] = true + servers[node.Id] = true + + // Create individual shard entries for each shard this server has + shardBits := ecShardInfo.EcIndexBits + for shardId := 0; shardId < erasure_coding.TotalShardsCount; shardId++ { + if (shardBits & (1 << uint(shardId))) != 0 { + ecShard := EcShardWithInfo{ + VolumeID: ecShardInfo.Id, + ShardID: uint32(shardId), + Collection: ecShardInfo.Collection, + Size: 0, // EC shards don't have individual size in the API response + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + DiskType: diskInfo.Type, + ModifiedTime: 0, // Not available in current API + EcIndexBits: ecShardInfo.EcIndexBits, + ShardCount: getShardCount(ecShardInfo.EcIndexBits), + } + shards = append(shards, ecShard) + } + } + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + if len(shards) == 0 { + return nil, fmt.Errorf("EC volume %d not found", volumeID) + } + + // Collect shard size information from volume servers + shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size + for _, shard := range shards { + server := shard.Server + if _, exists := shardSizeMap[server]; !exists { + // Query this server for shard sizes + err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error { + resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{ + VolumeId: volumeID, + }) + if err != nil { + glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err) + return nil // Continue with other servers, don't fail the entire request + } + + // Store shard sizes for this server + shardSizeMap[server] = make(map[uint32]uint64) + for _, shardInfo := range resp.EcShardInfos { + shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size) + } + + return nil + }) + if err != nil { + glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err) + } + } + } + + // Update shard sizes in the shards array + for i := range shards { + server := shards[i].Server + shardId := shards[i].ShardID + if serverSizes, exists := shardSizeMap[server]; exists { + if size, exists := serverSizes[shardId]; exists { + shards[i].Size = size + } + } + } + + // Calculate completeness based on unique shard IDs + foundShards := make(map[int]bool) + for _, shard := range shards { + foundShards[int(shard.ShardID)] = true + } + + totalUniqueShards := len(foundShards) + isComplete := (totalUniqueShards == erasure_coding.TotalShardsCount) + + // Calculate missing shards + var missingShards []int + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + if !foundShards[i] { + missingShards = append(missingShards, i) + } + } + + // Update completeness info for each shard + for i := range shards { + shards[i].IsComplete = isComplete + shards[i].MissingShards = missingShards + } + + // Sort shards based on parameters + sortEcShards(shards, sortBy, sortOrder) + + // Convert maps to slices + var dcList []string + for dc := range dataCenters { + dcList = append(dcList, dc) + } + var serverList []string + for server := range servers { + serverList = append(serverList, server) + } + + data := &EcVolumeDetailsData{ + VolumeID: volumeID, + Collection: collection, + Shards: shards, + TotalShards: totalUniqueShards, + IsComplete: isComplete, + MissingShards: missingShards, + DataCenters: dcList, + Servers: serverList, + LastUpdated: time.Now(), + SortBy: sortBy, + SortOrder: sortOrder, + } + + return data, nil +} |
