aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-19 01:45:30 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-19 01:45:30 -0700
commite19fd68f189c8842df21a666ade9f81aeec2435d (patch)
tree1390a172730ec9303f50d818bc0ae5d1429dcebe
parentf3da0906edf63b9634cf0c2864d88ad4587587ad (diff)
downloadseaweedfs-e19fd68f189c8842df21a666ade9f81aeec2435d.tar.xz
seaweedfs-e19fd68f189c8842df21a666ade9f81aeec2435d.zip
working now to start/stop volume servers
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go4
-rw-r--r--weed-fs/src/pkg/topology/data_center.go1
-rw-r--r--weed-fs/src/pkg/topology/data_node.go13
-rw-r--r--weed-fs/src/pkg/topology/node.go22
-rw-r--r--weed-fs/src/pkg/topology/rack.go1
-rw-r--r--weed-fs/src/pkg/topology/topology.go52
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go20
7 files changed, 82 insertions, 31 deletions
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
index 28dca68b4..41d7bd18c 100644
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ b/weed-fs/src/pkg/replication/volume_growth.go
@@ -126,9 +126,9 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, repType); err == nil {
- vi := &storage.VolumeInfo{Id: vid, Size: 0}
+ vi := storage.VolumeInfo{Id: vid, Size: 0}
server.AddOrUpdateVolume(vi)
- topo.RegisterVolumeLayout(vi, server)
+ topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server)
} else {
fmt.Println("Failed to assign", vid, "to", servers)
diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go
index 5edf7c6eb..c661090e8 100644
--- a/weed-fs/src/pkg/topology/data_center.go
+++ b/weed-fs/src/pkg/topology/data_center.go
@@ -13,6 +13,7 @@ func NewDataCenter(id string) *DataCenter {
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
+ dc.NodeImpl.value = dc
return dc
}
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
index 2305dddd2..cb625a41b 100644
--- a/weed-fs/src/pkg/topology/data_node.go
+++ b/weed-fs/src/pkg/topology/data_node.go
@@ -7,7 +7,7 @@ import (
type DataNode struct {
NodeImpl
- volumes map[storage.VolumeId]*storage.VolumeInfo
+ volumes map[storage.VolumeId]storage.VolumeInfo
Ip string
Port int
PublicUrl string
@@ -19,15 +19,12 @@ func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo)
+ s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.NodeImpl.value = s
return s
}
-func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
- dn.AddOrUpdateVolume(&storage.VolumeInfo{Id: vid})
- return vid
-}
-func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) {
- if dn.volumes[v.Id] == nil {
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
+ if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustActiveVolumeCountDelta(1)
dn.UpAdjustMaxVolumeId(v.Id)
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
index 52315ca2f..044b832ef 100644
--- a/weed-fs/src/pkg/topology/node.go
+++ b/weed-fs/src/pkg/topology/node.go
@@ -17,7 +17,7 @@ type Node interface {
GetActiveVolumeCount() int
GetMaxVolumeCount() int
GetMaxVolumeId() storage.VolumeId
- setParent(Node)
+ SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
@@ -25,6 +25,8 @@ type Node interface {
IsDataNode() bool
Children() map[NodeId]Node
Parent() Node
+
+ GetValue()interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
id NodeId
@@ -36,6 +38,7 @@ type NodeImpl struct {
//for rack, data center, topology
nodeType string
+ value interface{}
}
func (n *NodeImpl) IsDataNode() bool {
@@ -59,7 +62,7 @@ func (n *NodeImpl) Id() NodeId {
func (n *NodeImpl) FreeSpace() int {
return n.maxVolumeCount - n.activeVolumeCount
}
-func (n *NodeImpl) setParent(node Node) {
+func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
func (n *NodeImpl) Children() map[NodeId]Node {
@@ -68,6 +71,9 @@ func (n *NodeImpl) Children() map[NodeId]Node {
func (n *NodeImpl) Parent() Node {
return n.parent
}
+func (n *NodeImpl) GetValue()interface{}{
+ return n.value
+}
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
ret := false
var assignedNode *DataNode
@@ -130,14 +136,14 @@ func (n *NodeImpl) LinkChildNode(node Node) {
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
- node.setParent(n)
+ node.SetParent(n)
fmt.Println(n, "adds child", node)
}
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
node := n.children[nodeId]
- node.setParent(nil)
+ node.SetParent(nil)
if node != nil {
delete(n.children, node.Id())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
@@ -150,15 +156,15 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen > freshThreshHold {
+ if dn.LastSeen < freshThreshHold {
if !dn.Dead {
dn.Dead = true
n.GetTopology().chanDeadDataNodes <- dn
}
}
for _, v := range dn.volumes {
- if uint64(v.Size) < volumeSizeLimit {
- n.GetTopology().chanFullVolumes <- v
+ if uint64(v.Size) >= volumeSizeLimit {
+ n.GetTopology().chanFullVolumes <- &v
}
}
}
@@ -175,5 +181,5 @@ func (n *NodeImpl) GetTopology() *Topology{
for p.Parent() != nil {
p = p.Parent()
}
- return p.(*Topology)
+ return p.GetValue().(*Topology)
}
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
index bbcd594a2..16520d14a 100644
--- a/weed-fs/src/pkg/topology/rack.go
+++ b/weed-fs/src/pkg/topology/rack.go
@@ -15,6 +15,7 @@ func NewRack(id string) *Rack {
r.id = NodeId(id)
r.nodeType = "Rack"
r.children = make(map[NodeId]Node)
+ r.NodeImpl.value = r
return r
}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index 3943f9555..fc2c902b0 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -2,6 +2,7 @@ package topology
import (
"errors"
+ "fmt"
"math/rand"
"pkg/directory"
"pkg/sequence"
@@ -32,16 +33,20 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
+ t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
+
t.sequence = sequence.NewSequencer(dirname, filename)
+
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan *storage.VolumeInfo)
t.chanIncomplemteVolumes = make(chan *storage.VolumeInfo)
- t.chanRecoveredVolumes = make(chan *storage.VolumeInfo)
+ t.chanRecoveredVolumes = make(chan *storage.VolumeInfo)
+
return t
}
@@ -102,16 +107,10 @@ func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string,
rack := dc.GetOrCreateRack(ip)
dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
for _, v := range volumeInfos {
- dn.AddOrUpdateVolume(&v)
+ dn.AddOrUpdateVolume(v)
t.RegisterVolumeLayout(&v, dn)
}
}
-func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
- //TODO
-}
-func (t *Topology) UnRegisterDataNode(dn *DataNode) {
- //TODO
-}
func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter {
for _, c := range t.Children() {
@@ -155,14 +154,41 @@ func (t *Topology) StartRefreshWritableVolumes() {
go func() {
for {
select {
- case <-t.chanIncomplemteVolumes:
- case <-t.chanRecoveredVolumes:
- case fv := <-t.chanFullVolumes:
- t.SetVolumeReadOnly(fv)
- case <-t.chanRecoveredDataNodes:
+ case v := <-t.chanIncomplemteVolumes:
+ fmt.Println("Volume", v, "is incomplete!")
+ case v := <-t.chanRecoveredVolumes:
+ fmt.Println("Volume", v, "is recovered!")
+ case v := <-t.chanFullVolumes:
+ t.SetVolumeReadOnly(v)
+ fmt.Println("Volume", v, "is full!")
+ case dn := <-t.chanRecoveredDataNodes:
+ t.RegisterRecoveredDataNode(dn)
+ fmt.Println("DataNode", dn, "is back alive!")
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
+ fmt.Println("DataNode", dn, "is dead!")
}
}
}()
}
+func (t *Topology) SetVolumeReadOnly(volumeInfo *storage.VolumeInfo) {
+ vl := t.GetVolumeLayout(volumeInfo.RepType)
+ vl.SetVolumeReadOnly(volumeInfo.Id)
+}
+func (t *Topology) SetVolumeWritable(volumeInfo *storage.VolumeInfo) {
+ vl := t.GetVolumeLayout(volumeInfo.RepType)
+ vl.SetVolumeWritable(volumeInfo.Id)
+}
+func (t *Topology) UnRegisterDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn)
+ t.SetVolumeReadOnly(&v)
+ }
+}
+func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
+ for _, v := range dn.volumes {
+ if uint64(v.Size) < t.volumeSizeLimit {
+ t.SetVolumeWritable(&v)
+ }
+ }
+}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
index 031840e88..0c3841c72 100644
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -56,6 +56,26 @@ func (vl *VolumeLayout) GetActiveVolumeCount() int {
return len(vl.writables)
}
+func (vl *VolumeLayout) SetVolumeReadOnly(vid storage.VolumeId) bool {
+ for i, v := range vl.writables{
+ if v == vid {
+ vl.writables = append(vl.writables[:i],vl.writables[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
+
+func (vl *VolumeLayout) SetVolumeWritable(vid storage.VolumeId) bool {
+ for _, v := range vl.writables{
+ if v == vid {
+ return false
+ }
+ }
+ vl.writables = append(vl.writables, vid)
+ return true
+}
+
func (vl *VolumeLayout) ToMap() interface{} {
m := make(map[string]interface{})
m["replication"] = vl.repType.String()