diff options
Diffstat (limited to 'weed/topology/data_node.go')
| -rw-r--r-- | weed/topology/data_node.go | 232 |
1 files changed, 153 insertions, 79 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 617341e54..69f739dd5 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,12 +2,11 @@ package topology import ( "fmt" - "strconv" - "sync" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -15,122 +14,161 @@ import ( type DataNode struct { NodeImpl - volumes map[needle.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo - ecShardsLock sync.RWMutex + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) - s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) - s.NodeImpl.value = s - return s + dn := &DataNode{} + dn.id = NodeId(id) + dn.nodeType = "DataNode" + dn.diskUsages = newDiskUsages() + dn.children = make(map[NodeId]Node) + dn.NodeImpl.value = dn + return dn } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) + return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) } -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { dn.Lock() defer dn.Unlock() - if oldV, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(1) - } - dn.UpAdjustMaxVolumeId(v.Id) - isNew = true - } else { - if oldV.IsRemote() != v.IsRemote() { - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if oldV.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) - } - } - dn.volumes[v.Id] = v + return dn.doAddOrUpdateVolume(v) +} + +func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { + c, found := dn.children[NodeId(diskType)] + if !found { + c = NewDisk(diskType) + dn.doLinkChildNode(c) } - return + disk := c.(*Disk) + return disk +} + +func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + disk := dn.getOrCreateDisk(v.DiskType) + return disk.AddOrUpdateVolume(v) } -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { +// UpdateVolumes detects new/deleted/changed volumes on a volume server +// used in master to notify master clients of these changes. +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } + dn.Lock() - for vid, v := range dn.volumes { + defer dn.Unlock() + + existingVolumes := dn.getVolumes() + + for _, v := range existingVolumes { + vid := v.Id if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, vid) deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } - dn.Unlock() for _, v := range actualVolumes { - isNew := dn.AddOrUpdateVolume(v) + isNew, isChangedRO := dn.doAddOrUpdateVolume(v) if isNew { newVolumes = append(newVolumes, v) } + if isChangedRO { + changeRO = append(changeRO, v) + } } return } -func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { +func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) { dn.Lock() + defer dn.Unlock() + for _, v := range deletedVolumes { - delete(dn.volumes, v.Id) - dn.UpAdjustVolumeCountDelta(-1) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, v.Id) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } - dn.Unlock() - for _, v := range newlVolumes { - dn.AddOrUpdateVolume(v) + for _, v := range newVolumes { + dn.doAddOrUpdateVolume(v) } return } +func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { + deltaDiskUsages := newDiskUsages() + for diskType, maxVolumeCount := range maxVolumeCounts { + if maxVolumeCount == 0 { + // the volume server may have set the max to zero + continue + } + dt := types.ToDiskType(diskType) + currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) + if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) { + continue + } + disk := dn.getOrCreateDisk(dt.String()) + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) + deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() - for _, v := range dn.volumes { - ret = append(ret, v) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetVolumes()...) } dn.RUnlock() return ret } -func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { dn.RLock() defer dn.RUnlock() - vInfo, ok := dn.volumes[id] - if ok { + found := false + for _, c := range dn.children { + disk := c.(*Disk) + vInfo, found = disk.volumes[id] + if found { + break + } + } + if found { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") @@ -138,7 +176,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro } func (dn *DataNode) GetDataCenter() *DataCenter { - return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) + rack := dn.Parent() + dcNode := rack.Parent() + dcValue := dcNode.GetValue() + return dcValue.(*DataCenter) } func (dn *DataNode) GetRack() *Rack { @@ -165,28 +206,61 @@ func (dn *DataNode) Url() string { func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["EcShards"] = dn.GetEcShardCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl + + // aggregated volume info + var volumeCount, ecShardCount, maxVolumeCount int64 + var volumeIds string + for _, diskUsage := range dn.diskUsages.usages { + volumeCount += diskUsage.volumeCount + ecShardCount += diskUsage.ecShardCount + maxVolumeCount += diskUsage.maxVolumeCount + } + + for _, disk := range dn.Children() { + d := disk.(*Disk) + volumeIds += " " + d.GetVolumeIds() + } + + ret["Volumes"] = volumeCount + ret["EcShards"] = ecShardCount + ret["Max"] = maxVolumeCount + ret["VolumeIds"] = volumeIds + return ret } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ - Id: string(dn.Id()), - VolumeCount: uint64(dn.GetVolumeCount()), - MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dn.FreeSpace()), - ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), - } - for _, v := range dn.GetVolumes() { - m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + Id: string(dn.Id()), + DiskInfos: make(map[string]*master_pb.DiskInfo), } - for _, ecv := range dn.GetEcShards() { - m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + for _, c := range dn.Children() { + disk := c.(*Disk) + m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() } return m } + +// GetVolumeIds returns the human readable volume ids limited to count of max 100. +func (dn *DataNode) GetVolumeIds() string { + dn.RLock() + defer dn.RUnlock() + existingVolumes := dn.getVolumes() + ids := make([]int, 0, len(existingVolumes)) + + for k := range existingVolumes { + ids = append(ids, int(k)) + } + + return util.HumanReadableIntsMax(100, ids...) +} + +func (dn *DataNode) getVolumes() []storage.VolumeInfo { + var existingVolumes []storage.VolumeInfo + for _, c := range dn.children { + disk := c.(*Disk) + existingVolumes = append(existingVolumes, disk.GetVolumes()...) + } + return existingVolumes +} |
