diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/data_node.go | 3 | ||||
| -rw-r--r-- | weed/topology/node.go | 6 | ||||
| -rw-r--r-- | weed/topology/rack.go | 5 | ||||
| -rw-r--r-- | weed/topology/topology.go | 40 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 14 |
5 files changed, 4 insertions, 64 deletions
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go index b7f039559..0ef8ae14e 100644 --- a/weed/topology/data_node.go +++ b/weed/topology/data_node.go @@ -15,7 +15,6 @@ type DataNode struct { Port int PublicUrl string LastSeen int64 // unix time in seconds - Dead bool } func NewDataNode(id string) *DataNode { @@ -30,7 +29,7 @@ func NewDataNode(id string) *DataNode { func (dn *DataNode) String() string { dn.RLock() defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) + return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl) } func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { diff --git a/weed/topology/node.go b/weed/topology/node.go index 4ce35f4b0..7383f9576 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -242,12 +242,6 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi if n.IsRack() { for _, c := range n.Children() { dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true - n.GetTopology().chanDeadDataNodes <- dn - } - } for _, v := range dn.GetVolumes() { if uint64(v.Size) >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) diff --git a/weed/topology/rack.go b/weed/topology/rack.go index 1ca2f8de8..a48d64323 100644 --- a/weed/topology/rack.go +++ b/weed/topology/rack.go @@ -32,11 +32,6 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol dn := c.(*DataNode) if dn.MatchLocation(ip, port) { dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } return dn } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 04b500053..ffd32ae21 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -7,7 +7,6 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/sequence" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -24,11 +23,9 @@ type Topology struct { Sequence sequence.Sequencer - chanDeadDataNodes chan *DataNode - chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan storage.VolumeInfo + chanFullVolumes chan storage.VolumeInfo - configuration *Configuration + Configuration *Configuration RaftServer raft.Server } @@ -45,8 +42,6 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.Sequence = seq - t.chanDeadDataNodes = make(chan *DataNode) - t.chanRecoveredDataNodes = make(chan *DataNode) t.chanFullVolumes = make(chan storage.VolumeInfo) err := t.loadConfiguration(confFile) @@ -80,7 +75,7 @@ func (t *Topology) Leader() (string, error) { func (t *Topology) loadConfiguration(configurationFile string) error { b, e := ioutil.ReadFile(configurationFile) if e == nil { - t.configuration, e = NewConfiguration(b) + t.Configuration, e = NewConfiguration(b) return e } glog.V(0).Infoln("Using default configurations.") @@ -147,35 +142,6 @@ func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) } -func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { - t.Sequence.SetMax(*joinMessage.MaxFileKey) - dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) - if *joinMessage.IsInit && dn != nil { - t.UnRegisterDataNode(dn) - } - dn = rack.GetOrCreateDataNode(*joinMessage.Ip, - int(*joinMessage.Port), *joinMessage.PublicUrl, - int(*joinMessage.MaxVolumeCount)) - var volumeInfos []storage.VolumeInfo - for _, v := range joinMessage.Volumes { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } else { - glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) - } - } - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - t.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - t.UnRegisterVolumeLayout(v, dn) - } -} - func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { for _, c := range t.Children() { dc := c.(*DataCenter) diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 476aaf4d8..40019fdcd 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -31,12 +31,6 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { select { case v := <-t.chanFullVolumes: t.SetVolumeCapacityFull(v) - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - glog.V(0).Infoln("Recovered DataNode: %v", dn) - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - glog.V(0).Infof("Dead DataNode: %v", dn) } } }() @@ -64,11 +58,3 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) dn.Parent().UnlinkChildNode(dn.Id()) } -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.GetVolumes() { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) - if vl.isWritable(&v) { - vl.SetVolumeAvailable(dn, v.Id) - } - } -} |
