diff options
Diffstat (limited to 'weed/topology/topology_event_handling.go')
| -rw-r--r-- | weed/topology/topology_event_handling.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go new file mode 100644 index 000000000..737b94482 --- /dev/null +++ b/weed/topology/topology_event_handling.go @@ -0,0 +1,74 @@ +package topology + +import ( + "math/rand" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { + go func() { + for { + if t.IsLeader() { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + } + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func(garbageThreshold string) { + c := time.Tick(15 * time.Minute) + for _ = range c { + if t.IsLeader() { + t.Vacuum(garbageThreshold) + } + } + }(garbageThreshold) + go func() { + for { + select { + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + glog.V(0).Infoln("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + glog.V(0).Infoln("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } + for _, dn := range vl.vid2location[volumeInfo.Id].list { + if !volumeInfo.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } + return true +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.GetVolumes() { + glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl.SetVolumeUnavailable(dn, v.Id) + } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.GetVolumes() { + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + if vl.isWritable(&v) { + vl.SetVolumeAvailable(dn, v.Id) + } + } +} |
