diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-07-16 11:13:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-16 11:13:23 +0800 |
| commit | d19bbee98d89ec6cd603572bd9c5d55749610e61 (patch) | |
| tree | 8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/topology/data_node.go | |
| parent | 01060c992591f412b0d5e180bde29991747a9462 (diff) | |
| parent | 5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff) | |
| download | seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip | |
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/topology/data_node.go')
| -rw-r--r-- | weed/topology/data_node.go | 57 |
1 files changed, 49 insertions, 8 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 6ea6d3938..3e72ccdbf 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,6 +2,12 @@ package topology import ( "fmt" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" @@ -10,18 +16,21 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds + volumes map[needle.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex } func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } @@ -50,7 +59,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -74,6 +83,20 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume return } +func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { + dn.Lock() + for _, v := range deletedVolumes { + delete(dn.volumes, v.Id) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + dn.Unlock() + for _, v := range newlVolumes { + dn.AddOrUpdateVolume(v) + } + return +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() for _, v := range dn.volumes { @@ -83,7 +106,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] @@ -123,8 +146,26 @@ func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() + ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } + +func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { + m := &master_pb.DataNodeInfo{ + Id: string(dn.Id()), + VolumeCount: uint64(dn.GetVolumeCount()), + MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dn.FreeSpace()), + ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), + } + for _, v := range dn.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range dn.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} |
