aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/data_node.go
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/topology/data_node.go
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/topology/data_node.go')
-rw-r--r--weed/topology/data_node.go107
1 files changed, 94 insertions, 13 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index 6ea6d3938..efdf5285b 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -2,7 +2,13 @@ package topology
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"strconv"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
@@ -10,18 +16,21 @@ import (
type DataNode struct {
NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
+ volumes map[needle.VolumeId]storage.VolumeInfo
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
+ ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
+ ecShardsLock sync.RWMutex
}
func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
+ s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
s.NodeImpl.value = s
return s
}
@@ -32,25 +41,37 @@ func (dn *DataNode) String() string {
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew bool) {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
dn.Lock()
defer dn.Unlock()
- if _, ok := dn.volumes[v.Id]; !ok {
+ if oldV, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
isNew = true
} else {
+ if oldV.IsRemote() != v.IsRemote() {
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(1)
+ }
+ if oldV.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ }
+ isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly
dn.volumes[v.Id] = v
}
return
}
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes []storage.VolumeInfo) {
- actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
+ actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
@@ -61,15 +82,42 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolume
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
- dn.UpAdjustActiveVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
}
}
dn.Unlock()
for _, v := range actualVolumes {
- isNew := dn.AddOrUpdateVolume(v)
+ isNew, isChangedRO := dn.AddOrUpdateVolume(v)
if isNew {
newVolumes = append(newVolumes, v)
}
+ if isChangedRO {
+ changeRO = append(changeRO, v)
+ }
+ }
+ return
+}
+
+func (dn *DataNode) DeltaUpdateVolumes(newlVolumes, deletedVolumes []storage.VolumeInfo) {
+ dn.Lock()
+ for _, v := range deletedVolumes {
+ delete(dn.volumes, v.Id)
+ dn.UpAdjustVolumeCountDelta(-1)
+ if v.IsRemote() {
+ dn.UpAdjustRemoteVolumeCountDelta(-1)
+ }
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ }
+ dn.Unlock()
+ for _, v := range newlVolumes {
+ dn.AddOrUpdateVolume(v)
}
return
}
@@ -83,7 +131,7 @@ func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
return ret
}
-func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
+func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
dn.RLock()
defer dn.RUnlock()
vInfo, ok := dn.volumes[id]
@@ -123,8 +171,41 @@ func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount()
+ ret["VolumeIds"] = dn.GetVolumeIds()
+ ret["EcShards"] = dn.GetEcShardCount()
ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
return ret
}
+
+func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
+ m := &master_pb.DataNodeInfo{
+ Id: string(dn.Id()),
+ VolumeCount: uint64(dn.GetVolumeCount()),
+ MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
+ FreeVolumeCount: uint64(dn.FreeSpace()),
+ ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
+ RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
+ }
+ for _, v := range dn.GetVolumes() {
+ m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
+ }
+ for _, ecv := range dn.GetEcShards() {
+ m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
+ }
+ return m
+}
+
+// GetVolumeIds returns the human readable volume ids limited to count of max 100.
+func (dn *DataNode) GetVolumeIds() string {
+ dn.RLock()
+ defer dn.RUnlock()
+ ids := make([]int, 0, len(dn.volumes))
+
+ for k := range dn.volumes {
+ ids = append(ids, int(k))
+ }
+
+ return util.HumanReadableIntsMax(100, ids...)
+}