aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-19 16:48:04 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-19 16:48:04 -0700
commitfc9f1da1436b815a6aabb5927a8a90c36e61f15b (patch)
tree5fc215d803c0727ddd23e8e76d7574506a30d6aa
parente19fd68f189c8842df21a666ade9f81aeec2435d (diff)
downloadseaweedfs-fc9f1da1436b815a6aabb5927a8a90c36e61f15b.tar.xz
seaweedfs-fc9f1da1436b815a6aabb5927a8a90c36e61f15b.zip
handle volume server up/down events
-rw-r--r--weed-fs/src/pkg/topology/topology.go52
-rw-r--r--weed-fs/src/pkg/topology/topology_event_handling.go56
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go54
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go25
4 files changed, 112 insertions, 75 deletions
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index fc2c902b0..8dcd64dca 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -2,12 +2,10 @@ package topology
import (
"errors"
- "fmt"
"math/rand"
"pkg/directory"
"pkg/sequence"
"pkg/storage"
- "time"
)
type Topology struct {
@@ -142,53 +140,3 @@ func (t *Topology) ToMap() interface{} {
m["layouts"] = layouts
return m
}
-
-func (t *Topology) StartRefreshWritableVolumes() {
- go func() {
- for {
- freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
- t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
- time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
- go func() {
- for {
- select {
- case v := <-t.chanIncomplemteVolumes:
- fmt.Println("Volume", v, "is incomplete!")
- case v := <-t.chanRecoveredVolumes:
- fmt.Println("Volume", v, "is recovered!")
- case v := <-t.chanFullVolumes:
- t.SetVolumeReadOnly(v)
- fmt.Println("Volume", v, "is full!")
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- fmt.Println("DataNode", dn, "is back alive!")
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- fmt.Println("DataNode", dn, "is dead!")
- }
- }
- }()
-}
-func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
- vl.SetVolumeReadOnly(volumeInfo.Id)
-}
-func (t *Topology) SetVolumeWritable(volumeInfo *storage.VolumeInfo) {
- vl := t.GetVolumeLayout(volumeInfo.RepType)
- vl.SetVolumeWritable(volumeInfo.Id)
-}
-func (t *Topology) UnRegisterDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
- t.SetVolumeReadOnly(&v)
- }
-}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.volumes {
- if uint64(v.Size) < t.volumeSizeLimit {
- t.SetVolumeWritable(&v)
- }
- }
-}
diff --git a/weed-fs/src/pkg/topology/topology_event_handling.go b/weed-fs/src/pkg/topology/topology_event_handling.go
new file mode 100644
index 000000000..813826a61
--- /dev/null
+++ b/weed-fs/src/pkg/topology/topology_event_handling.go
@@ -0,0 +1,56 @@
+package topology
+
+import (
+ "fmt"
+ "math/rand"
+ "pkg/storage"
+ "time"
+)
+
+func (t *Topology) StartRefreshWritableVolumes() {
+ go func() {
+ for {
+ freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
+ t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
+ time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ go func() {
+ for {
+ select {
+ case v := <-t.chanIncomplemteVolumes:
+ fmt.Println("Volume", v, "is incomplete!")
+ case v := <-t.chanRecoveredVolumes:
+ fmt.Println("Volume", v, "is recovered!")
+ case v := <-t.chanFullVolumes:
+ t.SetVolumeCapacityFull(v)
+ fmt.Println("Volume", v, "is full!")
+ case dn := <-t.chanRecoveredDataNodes:
+ t.RegisterRecoveredDataNode(dn)
+ fmt.Println("DataNode", dn, "is back alive!")
+ case dn := <-t.chanDeadDataNodes:
+ t.UnRegisterDataNode(dn)
+ fmt.Println("DataNode", dn, "is dead!")
+ }
+ }
+ }()
+}
+func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) {
+ vl := t.GetVolumeLayout(volumeInfo.RepType)
+ vl.SetVolumeCapacityFull(volumeInfo.Id)
+}
+func (t *Topology) UnRegisterDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
+ vl := t.GetVolumeLayout(v.RepType)
+ vl.SetVolumeUnavailable(dn, v.Id)
+ }
+}
+func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ if uint64(v.Size) < t.volumeSizeLimit {
+ vl := t.GetVolumeLayout(v.RepType)
+ vl.SetVolumeAvailable(dn, v.Id)
+ }
+ }
+}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
index 0c3841c72..ba091b9bb 100644
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -9,7 +9,7 @@ import (
type VolumeLayout struct {
repType storage.ReplicationType
- vid2location map[storage.VolumeId]*DataNodeLocationList
+ vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
pulse int64
volumeSizeLimit uint64
@@ -18,7 +18,7 @@ type VolumeLayout struct {
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
return &VolumeLayout{
repType: repType,
- vid2location: make(map[storage.VolumeId]*DataNodeLocationList),
+ vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
pulse: pulse,
volumeSizeLimit: volumeSizeLimit,
@@ -27,7 +27,7 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if _, ok := vl.vid2location[v.Id]; !ok {
- vl.vid2location[v.Id] = NewDataNodeLocationList()
+ vl.vid2location[v.Id] = NewVolumeLocationList()
}
if vl.vid2location[v.Id].Add(dn) {
if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
@@ -38,7 +38,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
}
-func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *DataNodeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
fmt.Println("No more writable volumes!")
@@ -56,24 +56,44 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int {
return len(vl.writables)
}
-func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool {
- for i, v := range vl.writables{
- if v == vid {
- vl.writables = append(vl.writables[:i],vl.writables[i+1:]...)
- return true
+func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+ for i, v := range vl.writables {
+ if v == vid {
+ vl.writables = append(vl.writables[:i], vl.writables[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
+func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+ for _, v := range vl.writables {
+ if v == vid {
+ return false
+ }
+ }
+ vl.writables = append(vl.writables, vid)
+ return true
+}
+
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+ if vl.vid2location[vid].Remove(dn) {
+ if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
+ return vl.removeFromWritable(vid)
}
}
return false
}
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+ if vl.vid2location[vid].Add(dn) {
+ if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
+ return vl.setVolumeWritable(vid)
+ }
+ }
+ return false
+}
-func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool {
- for _, v := range vl.writables{
- if v == vid {
- return false
- }
- }
- vl.writables = append(vl.writables, vid)
- return true
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+ return vl.removeFromWritable(vid)
}
func (vl *VolumeLayout) ToMap() interface{} {
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
index f2e5dd894..16afb2dfb 100644
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -2,19 +2,23 @@ package topology
import ()
-type DataNodeLocationList struct {
+type VolumeLocationList struct {
list []*DataNode
}
-func NewDataNodeLocationList() *DataNodeLocationList {
- return &DataNodeLocationList{}
+func NewVolumeLocationList() *VolumeLocationList {
+ return &VolumeLocationList{}
}
-func (dnll *DataNodeLocationList) Head() *DataNode {
+func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0]
}
-func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
+func (dnll *VolumeLocationList) Length() int {
+ return len(dnll.list)
+}
+
+func (dnll *VolumeLocationList) Add(loc *DataNode) bool {
for _, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
return false
@@ -23,8 +27,17 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
dnll.list = append(dnll.list, loc)
return true
}
+func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
+ for i, dnl := range dnll.list {
+ if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
+ dnll.list = append(dnll.list[:i],dnll.list[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
-func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
+func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
var changed bool
for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {