aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/data_node.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/data_node.go')
-rw-r--r--weed/topology/data_node.go232
1 files changed, 153 insertions, 79 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 617341e54..69f739dd5 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,12 +2,11 @@ package topology
import (
"fmt"
- "strconv"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -15,122 +14,161 @@ import (
type DataNode struct {
NodeImpl
- volumes map[needle.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
- ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
- ecShardsLock sync.RWMutex
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
}
func NewDataNode(id string) *DataNode {
- s := &DataNode{}
- s.id = NodeId(id)
- s.nodeType = "DataNode"
- s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
- s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
- s.NodeImpl.value = s
- return s
+ dn := &DataNode{}
+ dn.id = NodeId(id)
+ dn.nodeType = "DataNode"
+ dn.diskUsages = newDiskUsages()
+ dn.children = make(map[NodeId]Node)
+ dn.NodeImpl.value = dn
+ return dn
}
func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
- return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
+ return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock()
defer dn.Unlock()
- if oldV, ok := dn.volumes[v.Id]; !ok {
- dn.volumes[v.Id] = v
- dn.UpAdjustVolumeCountDelta(1)
- if v.IsRemote() {
- dn.UpAdjustRemoteVolumeCountDelta(1)
- }
- if !v.ReadOnly {
- dn.UpAdjustActiveVolumeCountDelta(1)
- }
- dn.UpAdjustMaxVolumeId(v.Id)
- isNew = true
- } else {
- if oldV.IsRemote() != v.IsRemote() {
- if v.IsRemote() {
- dn.UpAdjustRemoteVolumeCountDelta(1)
- }
- if oldV.IsRemote() {
- dn.UpAdjustRemoteVolumeCountDelta(-1)
- }
- }
- dn.volumes[v.Id] = v
+ return dn.doAddOrUpdateVolume(v)
+}
+
+func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
+ c, found := dn.children[NodeId(diskType)]
+ if !found {
+ c = NewDisk(diskType)
+ dn.doLinkChildNode(c)
}
- return
+ disk := c.(*Disk)
+ return disk
+}
+
+func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
+ disk := dn.getOrCreateDisk(v.DiskType)
+ return disk.AddOrUpdateVolume(v)
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
+// UpdateVolumes detects new/deleted/changed volumes on a volume server
+// used in master to notify master clients of these changes.
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+
actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
+
dn.Lock()
- for vid, v := range dn.volumes {
+ defer dn.Unlock()
+
+ existingVolumes := dn.getVolumes()
+
+ for _, v := range existingVolumes {
+ vid := v.Id
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
- delete(dn.volumes, vid)
+ disk := dn.getOrCreateDisk(v.DiskType)
+ delete(disk.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
- dn.UpAdjustVolumeCountDelta(-1)
+
+ deltaDiskUsages := newDiskUsages()
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
+ deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
- dn.UpAdjustRemoteVolumeCountDelta(-1)
+ deltaDiskUsage.remoteVolumeCount = -1
}
if !v.ReadOnly {
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ deltaDiskUsage.activeVolumeCount = -1
}
+ disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
}
- dn.Unlock()
for _, v := range actualVolumes {
- isNew := dn.AddOrUpdateVolume(v)
+ isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
+ if isChangedRO {
+ changeRO = append(changeRO, v)
+ }
}
return
}
-func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
dn.Lock()
+ defer dn.Unlock()
+
for _, v := range deletedVolumes {
- delete(dn.volumes, v.Id)
- dn.UpAdjustVolumeCountDelta(-1)
+ disk := dn.getOrCreateDisk(v.DiskType)
+ delete(disk.volumes, v.Id)
+
+ deltaDiskUsages := newDiskUsages()
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
+ deltaDiskUsage.volumeCount = -1
if v.IsRemote() {
- dn.UpAdjustRemoteVolumeCountDelta(-1)
+ deltaDiskUsage.remoteVolumeCount = -1
}
if !v.ReadOnly {
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ deltaDiskUsage.activeVolumeCount = -1
}
+ disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
}
- dn.Unlock()
- for _, v := range newlVolumes {
- dn.AddOrUpdateVolume(v)
+ for _, v := range newVolumes {
+ dn.doAddOrUpdateVolume(v)
}
return
}
+func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
+ deltaDiskUsages := newDiskUsages()
+ for diskType, maxVolumeCount := range maxVolumeCounts {
+ if maxVolumeCount == 0 {
+ // the volume server may have set the max to zero
+ continue
+ }
+ dt := types.ToDiskType(diskType)
+ currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
+ if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
+ continue
+ }
+ disk := dn.getOrCreateDisk(dt.String())
+ deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
+ deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
+ disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
+ }
+}
+
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
dn.RLock()
- for _, v := range dn.volumes {
- ret = append(ret, v)
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ ret = append(ret, disk.GetVolumes()...)
}
dn.RUnlock()
return ret
}
-func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
+func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
dn.RLock()
defer dn.RUnlock()
- vInfo, ok := dn.volumes[id]
- if ok {
+ found := false
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ vInfo, found = disk.volumes[id]
+ if found {
+ break
+ }
+ }
+ if found {
return vInfo, nil
} else {
return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
@@ -138,7 +176,10 @@ func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, erro
}
func (dn *DataNode) GetDataCenter() *DataCenter {
- return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
+ rack := dn.Parent()
+ dcNode := rack.Parent()
+ dcValue := dcNode.GetValue()
+ return dcValue.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
@@ -165,28 +206,61 @@ func (dn *DataNode) Url() string {
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
- ret["Volumes"] = dn.GetVolumeCount()
- ret["EcShards"] = dn.GetEcShardCount()
- ret["Max"] = dn.GetMaxVolumeCount()
- ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
+
+ // aggregated volume info
+ var volumeCount, ecShardCount, maxVolumeCount int64
+ var volumeIds string
+ for _, diskUsage := range dn.diskUsages.usages {
+ volumeCount += diskUsage.volumeCount
+ ecShardCount += diskUsage.ecShardCount
+ maxVolumeCount += diskUsage.maxVolumeCount
+ }
+
+ for _, disk := range dn.Children() {
+ d := disk.(*Disk)
+ volumeIds += " " + d.GetVolumeIds()
+ }
+
+ ret["Volumes"] = volumeCount
+ ret["EcShards"] = ecShardCount
+ ret["Max"] = maxVolumeCount
+ ret["VolumeIds"] = volumeIds
+
return ret
}
func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
m := &master_pb.DataNodeInfo{
- Id: string(dn.Id()),
- VolumeCount: uint64(dn.GetVolumeCount()),
- MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
- FreeVolumeCount: uint64(dn.FreeSpace()),
- ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
- RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
- }
- for _, v := range dn.GetVolumes() {
- m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
+ Id: string(dn.Id()),
+ DiskInfos: make(map[string]*master_pb.DiskInfo),
}
- for _, ecv := range dn.GetEcShards() {
- m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
+ for _, c := range dn.Children() {
+ disk := c.(*Disk)
+ m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
}
return m
}
+
+// GetVolumeIds returns the human readable volume ids limited to count of max 100.
+func (dn *DataNode) GetVolumeIds() string {
+ dn.RLock()
+ defer dn.RUnlock()
+ existingVolumes := dn.getVolumes()
+ ids := make([]int, 0, len(existingVolumes))
+
+ for k := range existingVolumes {
+ ids = append(ids, int(k))
+ }
+
+ return util.HumanReadableIntsMax(100, ids...)
+}
+
+func (dn *DataNode) getVolumes() []storage.VolumeInfo {
+ var existingVolumes []storage.VolumeInfo
+ for _, c := range dn.children {
+ disk := c.(*Disk)
+ existingVolumes = append(existingVolumes, disk.GetVolumes()...)
+ }
+ return existingVolumes
+}