diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-02-18 13:57:34 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-18 13:57:34 +0800 |
| commit | c8f56f5712c1efffc46de95a8057ed09c21da2db (patch) | |
| tree | bc3330e274901d782395b7396cb54d7cc42608b1 /weed/topology/data_node.go | |
| parent | 12a78335860c4b1e220748e4adc4097050af5272 (diff) | |
| parent | 3575d41009e4367658e75e6ae780c6260b80daf9 (diff) | |
| download | seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.tar.xz seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.zip | |
Merge pull request #2 from chrislusf/master
Diffstat (limited to 'weed/topology/data_node.go')
| -rw-r--r-- | weed/topology/data_node.go | 177 |
1 files changed, 101 insertions, 76 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index eaed51654..1a0ebf761 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,13 +2,11 @@ package topology import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - "sync" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" @@ -16,29 +14,26 @@ import ( type DataNode struct { NodeImpl - volumes map[needle.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo - ecShardsLock sync.RWMutex + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds } func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) - s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) - s.NodeImpl.value = s - return s + dn := &DataNode{} + dn.id = NodeId(id) + dn.nodeType = "DataNode" + dn.diskUsages = newDiskUsages() + dn.children = make(map[NodeId]Node) + dn.NodeImpl.value = dn + return dn } func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) + return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { @@ -47,33 +42,23 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO return dn.doAddOrUpdateVolume(v) } -func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { - if oldV, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(1) - } - dn.UpAdjustMaxVolumeId(v.Id) - isNew = true - } else { - if oldV.IsRemote() != v.IsRemote() { - if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(1) - } - if oldV.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) - } - } - isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly - dn.volumes[v.Id] = v +func (dn *DataNode) getOrCreateDisk(diskType string) *Disk { + c, found := dn.children[NodeId(diskType)] + if !found { + c = NewDisk(diskType) + dn.doLinkChildNode(c) } - return + disk := c.(*Disk) + return disk } +func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) { + disk := dn.getOrCreateDisk(v.DiskType) + return disk.AddOrUpdateVolume(v) +} + +// UpdateVolumes detects new/deleted/changed volumes on a volume server +// used in master to notify master clients of these changes. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) { actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) @@ -84,18 +69,26 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume dn.Lock() defer dn.Unlock() - for vid, v := range dn.volumes { + existingVolumes := dn.getVolumes() + + for _, v := range existingVolumes { + vid := v.Id if _, ok := actualVolumeMap[vid]; !ok { glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, vid) deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } } for _, v := range actualVolumes { @@ -115,14 +108,19 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu defer dn.Unlock() for _, v := range deletedVolumes { - delete(dn.volumes, v.Id) - dn.UpAdjustVolumeCountDelta(-1) + disk := dn.getOrCreateDisk(v.DiskType) + delete(disk.volumes, v.Id) + + deltaDiskUsages := newDiskUsages() + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType)) + deltaDiskUsage.volumeCount = -1 if v.IsRemote() { - dn.UpAdjustRemoteVolumeCountDelta(-1) + deltaDiskUsage.remoteVolumeCount = -1 } if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) + deltaDiskUsage.activeVolumeCount = -1 } + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) } for _, v := range newVolumes { dn.doAddOrUpdateVolume(v) @@ -130,20 +128,47 @@ func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.Volu return } +func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) { + deltaDiskUsages := newDiskUsages() + for diskType, maxVolumeCount := range maxVolumeCounts { + if maxVolumeCount == 0 { + // the volume server may have set the max to zero + continue + } + dt := types.ToDiskType(diskType) + currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt) + if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) { + continue + } + disk := dn.getOrCreateDisk(dt.String()) + deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt) + deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount + disk.UpAdjustDiskUsageDelta(deltaDiskUsages) + } +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() - for _, v := range dn.volumes { - ret = append(ret, v) + for _, c := range dn.children { + disk := c.(*Disk) + ret = append(ret, disk.GetVolumes()...) } dn.RUnlock() return ret } -func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) { dn.RLock() defer dn.RUnlock() - vInfo, ok := dn.volumes[id] - if ok { + found := false + for _, c := range dn.children { + disk := c.(*Disk) + vInfo, found = disk.volumes[id] + if found { + break + } + } + if found { return vInfo, nil } else { return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found") @@ -181,29 +206,19 @@ func (dn *DataNode) Url() string { func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["VolumeIds"] = dn.GetVolumeIds() - ret["EcShards"] = dn.GetEcShardCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl + ret["Disks"] = dn.diskUsages.ToMap() return ret } func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { m := &master_pb.DataNodeInfo{ - Id: string(dn.Id()), - VolumeCount: uint64(dn.GetVolumeCount()), - MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), - FreeVolumeCount: uint64(dn.FreeSpace()), - ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), - RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()), + Id: string(dn.Id()), + DiskInfos: make(map[string]*master_pb.DiskInfo), } - for _, v := range dn.GetVolumes() { - m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) - } - for _, ecv := range dn.GetEcShards() { - m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + for _, c := range dn.Children() { + disk := c.(*Disk) + m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo() } return m } @@ -212,11 +227,21 @@ func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { func (dn *DataNode) GetVolumeIds() string { dn.RLock() defer dn.RUnlock() - ids := make([]int, 0, len(dn.volumes)) + existingVolumes := dn.getVolumes() + ids := make([]int, 0, len(existingVolumes)) - for k := range dn.volumes { + for k := range existingVolumes { ids = append(ids, int(k)) } return util.HumanReadableIntsMax(100, ids...) } + +func (dn *DataNode) getVolumes() []storage.VolumeInfo { + var existingVolumes []storage.VolumeInfo + for _, c := range dn.children { + disk := c.(*Disk) + existingVolumes = append(existingVolumes, disk.GetVolumes()...) + } + return existingVolumes +} |
