diff options
Diffstat (limited to 'weed/topology/topology_event_handling.go')
| -rw-r--r-- | weed/topology/topology_event_handling.go | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go new file mode 100644 index 000000000..6afd82dde --- /dev/null +++ b/weed/topology/topology_event_handling.go @@ -0,0 +1,67 @@ +package topology + +import ( + "fmt" + "math/rand" + "weed/storage" + "time" +) + +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { + go func() { + for { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func(garbageThreshold string) { + c := time.Tick(15 * time.Minute) + for _ = range c { + t.Vacuum(garbageThreshold) + } + }(garbageThreshold) + go func() { + for { + select { + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + fmt.Println("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + fmt.Println("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { + vl := t.GetVolumeLayout(volumeInfo.RepType) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } + for _, dn := range vl.vid2location[volumeInfo.Id].list { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + return true +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.volumes { + fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.RepType) + vl.SetVolumeUnavailable(dn, v.Id) + } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.volumes { + vl := t.GetVolumeLayout(v.RepType) + if vl.isWritable(&v) { + vl.SetVolumeAvailable(dn, v.Id) + } + } +} |
