diff options
Diffstat (limited to 'weed/topology/data_node.go')
| -rw-r--r-- | weed/topology/data_node.go | 57 |
1 files changed, 49 insertions, 8 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index 6ea6d3938..3e72ccdbf 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -2,6 +2,12 @@ package topology import ( "fmt" + "sync" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "strconv" "github.com/chrislusf/seaweedfs/weed/glog" @@ -10,18 +16,21 @@ import ( type DataNode struct { NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds + volumes map[needle.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo + ecShardsLock sync.RWMutex } func NewDataNode(id string) *DataNode { s := &DataNode{} s.id = NodeId(id) s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.volumes = make(map[needle.VolumeId]storage.VolumeInfo) + s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo) s.NodeImpl.value = s return s } @@ -50,7 +59,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) { } func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { actualVolumeMap[v.Id] = v } @@ -74,6 +83,20 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume return } +func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) { + dn.Lock() + for _, v := range deletedVolumes { + delete(dn.volumes, v.Id) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + dn.Unlock() + for _, v := range newlVolumes { + dn.AddOrUpdateVolume(v) + } + return +} + func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { dn.RLock() for _, v := range dn.volumes { @@ -83,7 +106,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { return ret } -func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) { +func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) { dn.RLock() defer dn.RUnlock() vInfo, ok := dn.volumes[id] @@ -123,8 +146,26 @@ func (dn *DataNode) ToMap() interface{} { ret := make(map[string]interface{}) ret["Url"] = dn.Url() ret["Volumes"] = dn.GetVolumeCount() + ret["EcShards"] = dn.GetEcShardCount() ret["Max"] = dn.GetMaxVolumeCount() ret["Free"] = dn.FreeSpace() ret["PublicUrl"] = dn.PublicUrl return ret } + +func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo { + m := &master_pb.DataNodeInfo{ + Id: string(dn.Id()), + VolumeCount: uint64(dn.GetVolumeCount()), + MaxVolumeCount: uint64(dn.GetMaxVolumeCount()), + FreeVolumeCount: uint64(dn.FreeSpace()), + ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()), + } + for _, v := range dn.GetVolumes() { + m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage()) + } + for _, ecv := range dn.GetEcShards() { + m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage()) + } + return m +} |
