aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2017-01-10 01:01:12 -0800
committerChris Lu <chris.lu@gmail.com>2017-01-10 01:01:12 -0800
commite46c3415f752e2e0c252c420adb882c4bcb7416b (patch)
tree65bb66899c2e97ebc340f09e7951e2f663ec4d78 /weed/topology
parent4beaaa06505220c80d502d7b3ebd8b8b71071f5f (diff)
downloadseaweedfs-e46c3415f752e2e0c252c420adb882c4bcb7416b.tar.xz
seaweedfs-e46c3415f752e2e0c252c420adb882c4bcb7416b.zip
gRpc for master~volume heartbeat
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/data_node.go3
-rw-r--r--weed/topology/node.go6
-rw-r--r--weed/topology/rack.go5
-rw-r--r--weed/topology/topology.go40
-rw-r--r--weed/topology/topology_event_handling.go14
5 files changed, 4 insertions, 64 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
index b7f039559..0ef8ae14e 100644
--- a/weed/topology/data_node.go
+++ b/weed/topology/data_node.go
@@ -15,7 +15,6 @@ type DataNode struct {
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
- Dead bool
}
func NewDataNode(id string) *DataNode {
@@ -30,7 +29,7 @@ func NewDataNode(id string) *DataNode {
func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
- return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
+ return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
diff --git a/weed/topology/node.go b/weed/topology/node.go
index 4ce35f4b0..7383f9576 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -242,12 +242,6 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen < freshThreshHold {
- if !dn.Dead {
- dn.Dead = true
- n.GetTopology().chanDeadDataNodes <- dn
- }
- }
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
index 1ca2f8de8..a48d64323 100644
--- a/weed/topology/rack.go
+++ b/weed/topology/rack.go
@@ -32,11 +32,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
- if dn.Dead {
- dn.Dead = false
- r.GetTopology().chanRecoveredDataNodes <- dn
- dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
- }
return dn
}
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 04b500053..ffd32ae21 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -24,11 +23,9 @@ type Topology struct {
Sequence sequence.Sequencer
- chanDeadDataNodes chan *DataNode
- chanRecoveredDataNodes chan *DataNode
- chanFullVolumes chan storage.VolumeInfo
+ chanFullVolumes chan storage.VolumeInfo
- configuration *Configuration
+ Configuration *Configuration
RaftServer raft.Server
}
@@ -45,8 +42,6 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.Sequence = seq
- t.chanDeadDataNodes = make(chan *DataNode)
- t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
err := t.loadConfiguration(confFile)
@@ -80,7 +75,7 @@ func (t *Topology) Leader() (string, error) {
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
- t.configuration, e = NewConfiguration(b)
+ t.Configuration, e = NewConfiguration(b)
return e
}
glog.V(0).Infoln("Using default configurations.")
@@ -147,35 +142,6 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
-func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
- t.Sequence.SetMax(*joinMessage.MaxFileKey)
- dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
- if *joinMessage.IsInit && dn != nil {
- t.UnRegisterDataNode(dn)
- }
- dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
- int(*joinMessage.Port), *joinMessage.PublicUrl,
- int(*joinMessage.MaxVolumeCount))
- var volumeInfos []storage.VolumeInfo
- for _, v := range joinMessage.Volumes {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- } else {
- glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
- }
- }
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- t.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- t.UnRegisterVolumeLayout(v, dn)
- }
-}
-
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 476aaf4d8..40019fdcd 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -31,12 +31,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
select {
case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v)
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- glog.V(0).Infoln("Recovered DataNode: %v", dn)
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- glog.V(0).Infof("Dead DataNode: %v", dn)
}
}
}()
@@ -64,11 +58,3 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id())
}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.GetVolumes() {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
- if vl.isWritable(&v) {
- vl.SetVolumeAvailable(dn, v.Id)
- }
- }
-}