diff options
Diffstat (limited to 'weed/admin/dash/admin_server.go')
| -rw-r--r-- | weed/admin/dash/admin_server.go | 1146 |
1 files changed, 1146 insertions, 0 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go new file mode 100644 index 000000000..fe37f5bb7 --- /dev/null +++ b/weed/admin/dash/admin_server.go @@ -0,0 +1,1146 @@ +package dash + +import ( + "context" + "fmt" + "net/http" + "os" + "sort" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +type AdminServer struct { + masterAddress string + templateFS http.FileSystem + grpcDialOption grpc.DialOption + cacheExpiration time.Duration + lastCacheUpdate time.Time + cachedTopology *ClusterTopology + + // Filer discovery and caching + cachedFilers []string + lastFilerUpdate time.Time + filerCacheExpiration time.Duration +} + +type ClusterTopology struct { + Masters []MasterNode `json:"masters"` + DataCenters []DataCenter `json:"datacenters"` + VolumeServers []VolumeServer `json:"volume_servers"` + TotalVolumes int `json:"total_volumes"` + TotalFiles int64 `json:"total_files"` + TotalSize int64 `json:"total_size"` + UpdatedAt time.Time `json:"updated_at"` +} + +type MasterNode struct { + Address string `json:"address"` + IsLeader bool `json:"is_leader"` + Status string `json:"status"` +} + +type DataCenter struct { + ID string `json:"id"` + Racks []Rack `json:"racks"` +} + +type Rack struct { + ID string `json:"id"` + Nodes []VolumeServer `json:"nodes"` +} + +type VolumeServer struct { + ID string `json:"id"` + Address string `json:"address"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + PublicURL string `json:"public_url"` + Volumes int `json:"volumes"` + MaxVolumes int `json:"max_volumes"` + DiskUsage int64 `json:"disk_usage"` + DiskCapacity int64 `json:"disk_capacity"` + LastHeartbeat time.Time `json:"last_heartbeat"` + Status string `json:"status"` +} + +// S3 Bucket management structures +type S3Bucket struct { + Name string `json:"name"` + CreatedAt time.Time `json:"created_at"` + Size int64 `json:"size"` + ObjectCount int64 `json:"object_count"` + LastModified time.Time `json:"last_modified"` + Status string `json:"status"` +} + +type S3Object struct { + Key string `json:"key"` + Size int64 `json:"size"` + LastModified time.Time `json:"last_modified"` + ETag string `json:"etag"` + StorageClass string `json:"storage_class"` +} + +type BucketDetails struct { + Bucket S3Bucket `json:"bucket"` + Objects []S3Object `json:"objects"` + TotalSize int64 `json:"total_size"` + TotalCount int64 `json:"total_count"` + UpdatedAt time.Time `json:"updated_at"` +} + +// Cluster management data structures +type ClusterVolumeServersData struct { + Username string `json:"username"` + VolumeServers []VolumeServer `json:"volume_servers"` + TotalVolumeServers int `json:"total_volume_servers"` + TotalVolumes int `json:"total_volumes"` + TotalCapacity int64 `json:"total_capacity"` + LastUpdated time.Time `json:"last_updated"` +} + +type VolumeInfo struct { + ID int `json:"id"` + Server string `json:"server"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + Collection string `json:"collection"` + Size int64 `json:"size"` + FileCount int64 `json:"file_count"` + Replication string `json:"replication"` + Status string `json:"status"` +} + +type ClusterVolumesData struct { + Username string `json:"username"` + Volumes []VolumeInfo `json:"volumes"` + TotalVolumes int `json:"total_volumes"` + TotalSize int64 `json:"total_size"` + LastUpdated time.Time `json:"last_updated"` + + // Pagination + CurrentPage int `json:"current_page"` + TotalPages int `json:"total_pages"` + PageSize int `json:"page_size"` + + // Sorting + SortBy string `json:"sort_by"` + SortOrder string `json:"sort_order"` +} + +type CollectionInfo struct { + Name string `json:"name"` + DataCenter string `json:"datacenter"` + Replication string `json:"replication"` + VolumeCount int `json:"volume_count"` + FileCount int64 `json:"file_count"` + TotalSize int64 `json:"total_size"` + TTL string `json:"ttl"` + DiskType string `json:"disk_type"` + Status string `json:"status"` +} + +type ClusterCollectionsData struct { + Username string `json:"username"` + Collections []CollectionInfo `json:"collections"` + TotalCollections int `json:"total_collections"` + TotalVolumes int `json:"total_volumes"` + TotalFiles int64 `json:"total_files"` + TotalSize int64 `json:"total_size"` + LastUpdated time.Time `json:"last_updated"` +} + +type MasterInfo struct { + Address string `json:"address"` + IsLeader bool `json:"is_leader"` + Status string `json:"status"` + Suffrage string `json:"suffrage"` +} + +type ClusterMastersData struct { + Username string `json:"username"` + Masters []MasterInfo `json:"masters"` + TotalMasters int `json:"total_masters"` + LeaderCount int `json:"leader_count"` + LastUpdated time.Time `json:"last_updated"` +} + +type FilerInfo struct { + Address string `json:"address"` + DataCenter string `json:"datacenter"` + Rack string `json:"rack"` + Version string `json:"version"` + CreatedAt time.Time `json:"created_at"` + Status string `json:"status"` +} + +type ClusterFilersData struct { + Username string `json:"username"` + Filers []FilerInfo `json:"filers"` + TotalFilers int `json:"total_filers"` + LastUpdated time.Time `json:"last_updated"` +} + +func NewAdminServer(masterAddress string, templateFS http.FileSystem) *AdminServer { + return &AdminServer{ + masterAddress: masterAddress, + templateFS: templateFS, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), + cacheExpiration: 10 * time.Second, + filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds + } +} + +// GetFilerAddress returns a filer address, discovering from masters if needed +func (s *AdminServer) GetFilerAddress() string { + // Discover filers from masters + filers := s.getDiscoveredFilers() + if len(filers) > 0 { + return filers[0] // Return the first available filer + } + + return "" +} + +// getDiscoveredFilers returns cached filers or discovers them from masters +func (s *AdminServer) getDiscoveredFilers() []string { + // Check if cache is still valid + if time.Since(s.lastFilerUpdate) < s.filerCacheExpiration && len(s.cachedFilers) > 0 { + return s.cachedFilers + } + + // Discover filers from masters + var filers []string + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return err + } + + for _, node := range resp.ClusterNodes { + filers = append(filers, node.Address) + } + + return nil + }) + + if err != nil { + glog.Warningf("Failed to discover filers from master %s: %v", s.masterAddress, err) + // Return cached filers even if expired, better than nothing + return s.cachedFilers + } + + // Update cache + s.cachedFilers = filers + s.lastFilerUpdate = time.Now() + + return filers +} + +// WithMasterClient executes a function with a master client connection +func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error { + masterAddr := pb.ServerAddress(s.masterAddress) + + return pb.WithMasterClient(false, masterAddr, s.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + return f(client) + }) +} + +// WithFilerClient executes a function with a filer client connection +func (s *AdminServer) WithFilerClient(f func(client filer_pb.SeaweedFilerClient) error) error { + filerAddr := s.GetFilerAddress() + if filerAddr == "" { + return fmt.Errorf("no filer available") + } + + return pb.WithGrpcFilerClient(false, 0, pb.ServerAddress(filerAddr), s.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return f(client) + }) +} + +// WithVolumeServerClient executes a function with a volume server client connection +func (s *AdminServer) WithVolumeServerClient(address pb.ServerAddress, f func(client volume_server_pb.VolumeServerClient) error) error { + return operation.WithVolumeServerClient(false, address, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return f(client) + }) +} + +// GetClusterTopology returns the current cluster topology with caching +func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) { + now := time.Now() + if s.cachedTopology != nil && now.Sub(s.lastCacheUpdate) < s.cacheExpiration { + return s.cachedTopology, nil + } + + topology := &ClusterTopology{ + UpdatedAt: now, + } + + // Use gRPC only + err := s.getTopologyViaGRPC(topology) + if err != nil { + glog.Errorf("Failed to connect to master server %s: %v", s.masterAddress, err) + return nil, fmt.Errorf("gRPC topology request failed: %v", err) + } + + // Cache the result + s.cachedTopology = topology + s.lastCacheUpdate = now + + return topology, nil +} + +// getTopologyViaGRPC gets topology using gRPC (original method) +func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error { + // Get cluster status from master + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + glog.Errorf("Failed to get volume list from master %s: %v", s.masterAddress, err) + return err + } + + if resp.TopologyInfo != nil { + // Process gRPC response + for _, dc := range resp.TopologyInfo.DataCenterInfos { + dataCenter := DataCenter{ + ID: dc.Id, + Racks: []Rack{}, + } + + for _, rack := range dc.RackInfos { + rackObj := Rack{ + ID: rack.Id, + Nodes: []VolumeServer{}, + } + + for _, node := range rack.DataNodeInfos { + // Calculate totals from disk infos + var totalVolumes int64 + var totalMaxVolumes int64 + var totalSize int64 + var totalFiles int64 + + for _, diskInfo := range node.DiskInfos { + totalVolumes += diskInfo.VolumeCount + totalMaxVolumes += diskInfo.MaxVolumeCount + + // Sum up individual volume information + for _, volInfo := range diskInfo.VolumeInfos { + totalSize += int64(volInfo.Size) + totalFiles += int64(volInfo.FileCount) + } + } + + vs := VolumeServer{ + ID: node.Id, + Address: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + PublicURL: node.Id, + Volumes: int(totalVolumes), + MaxVolumes: int(totalMaxVolumes), + DiskUsage: totalSize, + DiskCapacity: totalMaxVolumes * int64(resp.VolumeSizeLimitMb) * 1024 * 1024, + LastHeartbeat: time.Now(), + Status: "active", + } + + rackObj.Nodes = append(rackObj.Nodes, vs) + topology.VolumeServers = append(topology.VolumeServers, vs) + topology.TotalVolumes += vs.Volumes + topology.TotalFiles += totalFiles + topology.TotalSize += totalSize + } + + dataCenter.Racks = append(dataCenter.Racks, rackObj) + } + + topology.DataCenters = append(topology.DataCenters, dataCenter) + } + } + + return nil + }) + + return err +} + +// InvalidateCache forces a refresh of cached data +func (s *AdminServer) InvalidateCache() { + s.lastCacheUpdate = time.Time{} + s.cachedTopology = nil + s.lastFilerUpdate = time.Time{} + s.cachedFilers = nil +} + +// GetS3Buckets retrieves all S3 buckets from the filer and collects size/object data from collections +func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) { + var buckets []S3Bucket + + // Build a map of collection name to collection data + collectionMap := make(map[string]struct { + Size int64 + FileCount int64 + }) + + // Collect volume information by collection + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + collection := volInfo.Collection + if collection == "" { + collection = "default" + } + + if _, exists := collectionMap[collection]; !exists { + collectionMap[collection] = struct { + Size int64 + FileCount int64 + }{} + } + + data := collectionMap[collection] + data.Size += int64(volInfo.Size) + data.FileCount += int64(volInfo.FileCount) + collectionMap[collection] = data + } + } + } + } + } + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to get volume information: %v", err) + } + + // Get filer configuration to determine FilerGroup + var filerGroup string + err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + configResp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + glog.Warningf("Failed to get filer configuration: %v", err) + // Continue without filer group + return nil + } + filerGroup = configResp.FilerGroup + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to get filer configuration: %v", err) + } + + // Now list buckets from the filer and match with collection data + err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // List buckets by looking at the /buckets directory + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: "/buckets", + Prefix: "", + StartFromFileName: "", + InclusiveStartFrom: false, + Limit: 1000, + }) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err != nil { + if err.Error() == "EOF" { + break + } + return err + } + + if resp.Entry.IsDirectory { + bucketName := resp.Entry.Name + + // Determine collection name for this bucket + var collectionName string + if filerGroup != "" { + collectionName = fmt.Sprintf("%s_%s", filerGroup, bucketName) + } else { + collectionName = bucketName + } + + // Get size and object count from collection data + var size int64 + var objectCount int64 + if collectionData, exists := collectionMap[collectionName]; exists { + size = collectionData.Size + objectCount = collectionData.FileCount + } + + bucket := S3Bucket{ + Name: bucketName, + CreatedAt: time.Unix(resp.Entry.Attributes.Crtime, 0), + Size: size, + ObjectCount: objectCount, + LastModified: time.Unix(resp.Entry.Attributes.Mtime, 0), + Status: "active", + } + buckets = append(buckets, bucket) + } + } + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to list S3 buckets: %v", err) + } + + return buckets, nil +} + +// GetBucketDetails retrieves detailed information about a specific bucket +func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error) { + bucketPath := fmt.Sprintf("/buckets/%s", bucketName) + + details := &BucketDetails{ + Bucket: S3Bucket{ + Name: bucketName, + Status: "active", + }, + Objects: []S3Object{}, + UpdatedAt: time.Now(), + } + + err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // Get bucket info + bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: "/buckets", + Name: bucketName, + }) + if err != nil { + return fmt.Errorf("bucket not found: %v", err) + } + + details.Bucket.CreatedAt = time.Unix(bucketResp.Entry.Attributes.Crtime, 0) + details.Bucket.LastModified = time.Unix(bucketResp.Entry.Attributes.Mtime, 0) + + // List objects in bucket (recursively) + return s.listBucketObjects(client, bucketPath, "", details) + }) + + if err != nil { + return nil, err + } + + return details, nil +} + +// listBucketObjects recursively lists all objects in a bucket +func (s *AdminServer) listBucketObjects(client filer_pb.SeaweedFilerClient, directory, prefix string, details *BucketDetails) error { + stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{ + Directory: directory, + Prefix: prefix, + StartFromFileName: "", + InclusiveStartFrom: false, + Limit: 1000, + }) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err != nil { + if err.Error() == "EOF" { + break + } + return err + } + + entry := resp.Entry + if entry.IsDirectory { + // Recursively list subdirectories + subDir := fmt.Sprintf("%s/%s", directory, entry.Name) + err := s.listBucketObjects(client, subDir, "", details) + if err != nil { + return err + } + } else { + // Add file object + objectKey := entry.Name + if directory != fmt.Sprintf("/buckets/%s", details.Bucket.Name) { + // Remove bucket prefix to get relative path + relativePath := directory[len(fmt.Sprintf("/buckets/%s", details.Bucket.Name))+1:] + objectKey = fmt.Sprintf("%s/%s", relativePath, entry.Name) + } + + obj := S3Object{ + Key: objectKey, + Size: int64(entry.Attributes.FileSize), + LastModified: time.Unix(entry.Attributes.Mtime, 0), + ETag: "", // Could be calculated from chunks if needed + StorageClass: "STANDARD", + } + + details.Objects = append(details.Objects, obj) + details.TotalSize += obj.Size + details.TotalCount++ + } + } + + // Update bucket totals + details.Bucket.Size = details.TotalSize + details.Bucket.ObjectCount = details.TotalCount + + return nil +} + +// CreateS3Bucket creates a new S3 bucket +func (s *AdminServer) CreateS3Bucket(bucketName string) error { + return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // First ensure /buckets directory exists + _, err := client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/", + Entry: &filer_pb.Entry{ + Name: "buckets", + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + FileMode: uint32(0755 | os.ModeDir), // Directory mode + Uid: uint32(1000), + Gid: uint32(1000), + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + TtlSec: 0, + }, + }, + }) + // Ignore error if directory already exists + if err != nil && !strings.Contains(err.Error(), "already exists") && !strings.Contains(err.Error(), "existing entry") { + return fmt.Errorf("failed to create /buckets directory: %v", err) + } + + // Check if bucket already exists + _, err = client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: "/buckets", + Name: bucketName, + }) + if err == nil { + return fmt.Errorf("bucket %s already exists", bucketName) + } + + // Create bucket directory under /buckets + _, err = client.CreateEntry(context.Background(), &filer_pb.CreateEntryRequest{ + Directory: "/buckets", + Entry: &filer_pb.Entry{ + Name: bucketName, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + FileMode: uint32(0755 | os.ModeDir), // Directory mode + Uid: uint32(1000), + Gid: uint32(1000), + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + TtlSec: 0, + }, + }, + }) + if err != nil { + return fmt.Errorf("failed to create bucket directory: %v", err) + } + + return nil + }) +} + +// DeleteS3Bucket deletes an S3 bucket and all its contents +func (s *AdminServer) DeleteS3Bucket(bucketName string) error { + return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // Delete bucket directory recursively + _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{ + Directory: "/buckets", + Name: bucketName, + IsDeleteData: true, + IsRecursive: true, + IgnoreRecursiveError: false, + }) + if err != nil { + return fmt.Errorf("failed to delete bucket: %v", err) + } + + return nil + }) +} + +// GetObjectStoreUsers retrieves object store users data +func (s *AdminServer) GetObjectStoreUsers() ([]ObjectStoreUser, error) { + // For now, return mock data since SeaweedFS doesn't have built-in user management + // In a real implementation, this would query the IAM system or user database + users := []ObjectStoreUser{ + { + Username: "admin", + Email: "admin@example.com", + AccessKey: "AKIAIOSFODNN7EXAMPLE", + SecretKey: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", + Status: "active", + CreatedAt: time.Now().AddDate(0, -1, 0), + LastLogin: time.Now().AddDate(0, 0, -1), + Permissions: []string{"s3:*", "iam:*"}, + }, + { + Username: "readonly", + Email: "readonly@example.com", + AccessKey: "AKIAI44QH8DHBEXAMPLE", + SecretKey: "je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY", + Status: "active", + CreatedAt: time.Now().AddDate(0, -2, 0), + LastLogin: time.Now().AddDate(0, 0, -3), + Permissions: []string{"s3:GetObject", "s3:ListBucket"}, + }, + { + Username: "backup", + Email: "backup@example.com", + AccessKey: "AKIAIGCEVSQ6C2EXAMPLE", + SecretKey: "BnL1dIqRF/+WoWcouZ5e3qthJhEXAMPLEKEY", + Status: "inactive", + CreatedAt: time.Now().AddDate(0, -3, 0), + LastLogin: time.Now().AddDate(0, -1, -15), + Permissions: []string{"s3:PutObject", "s3:GetObject"}, + }, + } + + return users, nil +} + +// GetClusterVolumeServers retrieves cluster volume servers data +func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) { + topology, err := s.GetClusterTopology() + if err != nil { + return nil, err + } + + var totalCapacity int64 + var totalVolumes int + for _, vs := range topology.VolumeServers { + totalCapacity += vs.DiskCapacity + totalVolumes += vs.Volumes + } + + return &ClusterVolumeServersData{ + VolumeServers: topology.VolumeServers, + TotalVolumeServers: len(topology.VolumeServers), + TotalVolumes: totalVolumes, + TotalCapacity: totalCapacity, + LastUpdated: time.Now(), + }, nil +} + +// GetClusterVolumes retrieves cluster volumes data with pagination and sorting +func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, sortOrder string) (*ClusterVolumesData, error) { + // Set defaults + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 1000 { + pageSize = 100 + } + if sortBy == "" { + sortBy = "id" + } + if sortOrder == "" { + sortOrder = "asc" + } + var volumes []VolumeInfo + var totalSize int64 + volumeID := 1 + + // Get detailed volume information via gRPC + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + // Extract collection name from volume info + collectionName := volInfo.Collection + if collectionName == "" { + collectionName = "default" // Default collection for volumes without explicit collection + } + + volume := VolumeInfo{ + ID: volumeID, + Server: node.Id, + DataCenter: dc.Id, + Rack: rack.Id, + Collection: collectionName, + Size: int64(volInfo.Size), + FileCount: int64(volInfo.FileCount), + Replication: fmt.Sprintf("%03d", volInfo.ReplicaPlacement), + Status: "active", + } + volumes = append(volumes, volume) + totalSize += volume.Size + volumeID++ + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Sort volumes + s.sortVolumes(volumes, sortBy, sortOrder) + + // Calculate pagination + totalVolumes := len(volumes) + totalPages := (totalVolumes + pageSize - 1) / pageSize + if totalPages == 0 { + totalPages = 1 + } + + // Apply pagination + startIndex := (page - 1) * pageSize + endIndex := startIndex + pageSize + if startIndex >= totalVolumes { + volumes = []VolumeInfo{} + } else { + if endIndex > totalVolumes { + endIndex = totalVolumes + } + volumes = volumes[startIndex:endIndex] + } + + return &ClusterVolumesData{ + Volumes: volumes, + TotalVolumes: totalVolumes, + TotalSize: totalSize, + LastUpdated: time.Now(), + CurrentPage: page, + TotalPages: totalPages, + PageSize: pageSize, + SortBy: sortBy, + SortOrder: sortOrder, + }, nil +} + +// sortVolumes sorts the volumes slice based on the specified field and order +func (s *AdminServer) sortVolumes(volumes []VolumeInfo, sortBy string, sortOrder string) { + sort.Slice(volumes, func(i, j int) bool { + var less bool + + switch sortBy { + case "id": + less = volumes[i].ID < volumes[j].ID + case "server": + less = volumes[i].Server < volumes[j].Server + case "datacenter": + less = volumes[i].DataCenter < volumes[j].DataCenter + case "rack": + less = volumes[i].Rack < volumes[j].Rack + case "collection": + less = volumes[i].Collection < volumes[j].Collection + case "size": + less = volumes[i].Size < volumes[j].Size + case "filecount": + less = volumes[i].FileCount < volumes[j].FileCount + case "replication": + less = volumes[i].Replication < volumes[j].Replication + case "status": + less = volumes[i].Status < volumes[j].Status + default: + less = volumes[i].ID < volumes[j].ID + } + + if sortOrder == "desc" { + return !less + } + return less + }) +} + +// GetClusterCollections retrieves cluster collections data +func (s *AdminServer) GetClusterCollections() (*ClusterCollectionsData, error) { + var collections []CollectionInfo + var totalVolumes int + var totalFiles int64 + var totalSize int64 + collectionMap := make(map[string]*CollectionInfo) + + // Get actual collection information from volume data + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + if err != nil { + return err + } + + if resp.TopologyInfo != nil { + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, node := range rack.DataNodeInfos { + for _, diskInfo := range node.DiskInfos { + for _, volInfo := range diskInfo.VolumeInfos { + // Extract collection name from volume info + collectionName := volInfo.Collection + if collectionName == "" { + collectionName = "default" // Default collection for volumes without explicit collection + } + + // Get or create collection info + if collection, exists := collectionMap[collectionName]; exists { + collection.VolumeCount++ + collection.FileCount += int64(volInfo.FileCount) + collection.TotalSize += int64(volInfo.Size) + + // Update data center if this collection spans multiple DCs + if collection.DataCenter != dc.Id && collection.DataCenter != "multi" { + collection.DataCenter = "multi" + } + + totalVolumes++ + totalFiles += int64(volInfo.FileCount) + totalSize += int64(volInfo.Size) + } else { + // Format TTL properly + var ttlStr string + if volInfo.Ttl > 0 { + ttlStr = fmt.Sprintf("%ds", volInfo.Ttl) + } else { + ttlStr = "" + } + + newCollection := CollectionInfo{ + Name: collectionName, + DataCenter: dc.Id, + Replication: fmt.Sprintf("%03d", volInfo.ReplicaPlacement), + VolumeCount: 1, + FileCount: int64(volInfo.FileCount), + TotalSize: int64(volInfo.Size), + TTL: ttlStr, + DiskType: "hdd", // Default disk type + Status: "active", + } + collectionMap[collectionName] = &newCollection + totalVolumes++ + totalFiles += int64(volInfo.FileCount) + totalSize += int64(volInfo.Size) + } + } + } + } + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Convert map to slice + for _, collection := range collectionMap { + collections = append(collections, *collection) + } + + // If no collections found, show a message indicating no collections exist + if len(collections) == 0 { + // Return empty collections data instead of creating fake ones + return &ClusterCollectionsData{ + Collections: []CollectionInfo{}, + TotalCollections: 0, + TotalVolumes: 0, + TotalFiles: 0, + TotalSize: 0, + LastUpdated: time.Now(), + }, nil + } + + return &ClusterCollectionsData{ + Collections: collections, + TotalCollections: len(collections), + TotalVolumes: totalVolumes, + TotalFiles: totalFiles, + TotalSize: totalSize, + LastUpdated: time.Now(), + }, nil +} + +// GetClusterMasters retrieves cluster masters data +func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) { + var masters []MasterInfo + var leaderCount int + + // First, get master information from topology + topology, err := s.GetClusterTopology() + if err != nil { + return nil, err + } + + // Create a map to merge topology and raft data + masterMap := make(map[string]*MasterInfo) + + // Add masters from topology + for _, master := range topology.Masters { + masterInfo := &MasterInfo{ + Address: master.Address, + IsLeader: master.IsLeader, + Status: master.Status, + Suffrage: "", + } + + if master.IsLeader { + leaderCount++ + } + + masterMap[master.Address] = masterInfo + } + + // Then, get additional master information from Raft cluster + err = s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.RaftListClusterServers(context.Background(), &master_pb.RaftListClusterServersRequest{}) + if err != nil { + return err + } + + // Process each raft server + for _, server := range resp.ClusterServers { + address := server.Address + + // Update existing master info or create new one + if masterInfo, exists := masterMap[address]; exists { + // Update existing master with raft data + masterInfo.IsLeader = server.IsLeader + masterInfo.Suffrage = server.Suffrage + masterInfo.Status = "active" // If it's in raft cluster, it's active + } else { + // Create new master info from raft data + masterInfo := &MasterInfo{ + Address: address, + IsLeader: server.IsLeader, + Status: "active", + Suffrage: server.Suffrage, + } + masterMap[address] = masterInfo + } + + if server.IsLeader { + // Update leader count based on raft data + leaderCount = 1 // There should only be one leader + } + } + + return nil + }) + + if err != nil { + // If gRPC call fails, log the error but continue with topology data + glog.Errorf("Failed to get raft cluster servers from master %s: %v", s.masterAddress, err) + } + + // Convert map to slice + for _, masterInfo := range masterMap { + masters = append(masters, *masterInfo) + } + + // If no masters found at all, add the configured master as fallback + if len(masters) == 0 { + masters = append(masters, MasterInfo{ + Address: s.masterAddress, + IsLeader: true, + Status: "active", + Suffrage: "Voter", + }) + leaderCount = 1 + } + + return &ClusterMastersData{ + Masters: masters, + TotalMasters: len(masters), + LeaderCount: leaderCount, + LastUpdated: time.Now(), + }, nil +} + +// GetClusterFilers retrieves cluster filers data +func (s *AdminServer) GetClusterFilers() (*ClusterFilersData, error) { + var filers []FilerInfo + + // Get filer information from master using ListClusterNodes + err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return err + } + + // Process each filer node + for _, node := range resp.ClusterNodes { + createdAt := time.Unix(0, node.CreatedAtNs) + + filerInfo := FilerInfo{ + Address: node.Address, + DataCenter: node.DataCenter, + Rack: node.Rack, + Version: node.Version, + CreatedAt: createdAt, + Status: "active", // If it's in the cluster list, it's considered active + } + + filers = append(filers, filerInfo) + } + + return nil + }) + + if err != nil { + return nil, fmt.Errorf("failed to get filer nodes from master: %v", err) + } + + return &ClusterFilersData{ + Filers: filers, + TotalFilers: len(filers), + LastUpdated: time.Now(), + }, nil +} + +// GetAllFilers returns all discovered filers +func (s *AdminServer) GetAllFilers() []string { + return s.getDiscoveredFilers() +} |
