aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-18 14:05:12 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-18 14:05:12 -0700
commitf3da0906edf63b9634cf0c2864d88ad4587587ad (patch)
tree25b7e669f64c64d0f1d41f660656f58284d1bb52
parentb0e250d43753a0c983fd574700a0c6d726f30360 (diff)
downloadseaweedfs-f3da0906edf63b9634cf0c2864d88ad4587587ad.tar.xz
seaweedfs-f3da0906edf63b9634cf0c2864d88ad4587587ad.zip
channel based visitor pattern
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go11
-rw-r--r--weed-fs/src/pkg/topology/data_node.go5
-rw-r--r--weed-fs/src/pkg/topology/node.go23
-rw-r--r--weed-fs/src/pkg/topology/rack.go4
-rw-r--r--weed-fs/src/pkg/topology/topology.go50
5 files changed, 61 insertions, 32 deletions
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index da41b3510..99aa90a5f 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -97,18 +97,9 @@ func (m *Mapper) remove(machine *Machine) {
foundIndex = index
}
}
- m.vid2machines[v.Id] = deleteFromSlice(foundIndex,m.vid2machines[v.Id])
+ m.vid2machines[v.Id] = append(m.vid2machines[v.Id][:foundIndex], m.vid2machines[v.Id][foundIndex+1:]...)
}
}
-func deleteFromSlice(i int, slice []*Machine) []*Machine{
- switch i {
- case -1://do nothing
- case 0: slice = slice[1:]
- case len(slice)-1: slice = slice[:len(slice)-1]
- default: slice = append(slice[:i], slice[i+1:]...)
- }
- return slice
-}
func (m *Mapper) StartRefreshWritableVolumes() {
go func() {
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
index 04c7cf111..2305dddd2 100644
--- a/weed-fs/src/pkg/topology/data_node.go
+++ b/weed-fs/src/pkg/topology/data_node.go
@@ -12,6 +12,7 @@ type DataNode struct {
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
+ Dead bool
}
func NewDataNode(id string) *DataNode {
@@ -30,8 +31,8 @@ func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) {
dn.volumes[v.Id] = v
dn.UpAdjustActiveVolumeCountDelta(1)
dn.UpAdjustMaxVolumeId(v.Id)
- }else{
- dn.volumes[v.Id] = v
+ } else {
+ dn.volumes[v.Id] = v
}
}
func (dn *DataNode) GetTopology() *Topology {
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
index ddaf9f0b2..52315ca2f 100644
--- a/weed-fs/src/pkg/topology/node.go
+++ b/weed-fs/src/pkg/topology/node.go
@@ -20,7 +20,7 @@ type Node interface {
setParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
- CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId
+ CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
IsDataNode() bool
Children() map[NodeId]Node
@@ -146,25 +146,34 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
}
}
-func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId {
- var ret []storage.VolumeId
+func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
if dn.LastSeen > freshThreshHold {
- continue
+ if !dn.Dead {
+ dn.Dead = true
+ n.GetTopology().chanDeadDataNodes <- dn
+ }
}
for _, v := range dn.volumes {
if uint64(v.Size) < volumeSizeLimit {
- ret = append(ret, v.Id)
+ n.GetTopology().chanFullVolumes <- v
}
}
}
} else {
for _, c := range n.Children() {
- ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...)
+ c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
}
}
+}
- return ret
+func (n *NodeImpl) GetTopology() *Topology{
+ var p Node
+ p = n
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ return p.(*Topology)
}
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
index c819feb00..bbcd594a2 100644
--- a/weed-fs/src/pkg/topology/rack.go
+++ b/weed-fs/src/pkg/topology/rack.go
@@ -30,6 +30,10 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
+ if dn.Dead {
+ dn.Dead = false
+ r.GetTopology().chanRecoveredDataNodes <- dn
+ }
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
return dn
}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index a60768f14..3943f9555 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -20,6 +20,12 @@ type Topology struct {
volumeSizeLimit uint64
sequence sequence.Sequencer
+
+ chanDeadDataNodes chan *DataNode
+ chanRecoveredDataNodes chan *DataNode
+ chanFullVolumes chan *storage.VolumeInfo
+ chanIncomplemteVolumes chan *storage.VolumeInfo
+ chanRecoveredVolumes chan *storage.VolumeInfo
}
func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
@@ -31,6 +37,11 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, filename)
+ t.chanDeadDataNodes = make(chan *DataNode)
+ t.chanRecoveredDataNodes = make(chan *DataNode)
+ t.chanFullVolumes = make(chan *storage.VolumeInfo)
+ t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo)
+ t.chanRecoveredVolumes = make(chan *storage.VolumeInfo)
return t
}
@@ -95,6 +106,12 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string,
t.RegisterVolumeLayout(&v, dn)
}
}
+func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
+ //TODO
+}
+func (t *Topology) UnRegisterDataNode(dn *DataNode) {
+ //TODO
+}
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter {
for _, c := range t.Children() {
@@ -128,17 +145,24 @@ func (t *Topology) ToMap() interface{} {
}
func (t *Topology) StartRefreshWritableVolumes() {
- go func() {
- for {
- t.refreshWritableVolumes()
- time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
-}
-
-func (t *Topology) refreshWritableVolumes() {
- freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
- //setting Writers, copy-on-write because of possible updating, this needs some future work!
- t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit)
- //TODO: collect writable columes for each replication type
+ go func() {
+ for {
+ freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
+ t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
+ time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ go func() {
+ for {
+ select {
+ case <-t.chanIncomplemteVolumes:
+ case <-t.chanRecoveredVolumes:
+ case fv := <-t.chanFullVolumes:
+ t.SetVolumeReadOnly(fv)
+ case <-t.chanRecoveredDataNodes:
+ case dn := <-t.chanDeadDataNodes:
+ t.UnRegisterDataNode(dn)
+ }
+ }
+ }()
}