diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/configuration.go | 56 | ||||
| -rw-r--r-- | go/topology/configuration_test.go | 42 | ||||
| -rw-r--r-- | go/topology/data_center.go | 41 | ||||
| -rw-r--r-- | go/topology/data_node.go | 60 | ||||
| -rw-r--r-- | go/topology/node.go | 200 | ||||
| -rw-r--r-- | go/topology/node_list.go | 69 | ||||
| -rw-r--r-- | go/topology/node_list_test.go | 39 | ||||
| -rw-r--r-- | go/topology/rack.go | 64 | ||||
| -rw-r--r-- | go/topology/topo_test.go | 127 | ||||
| -rw-r--r-- | go/topology/topology.go | 148 | ||||
| -rw-r--r-- | go/topology/topology_compact.go | 150 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 67 | ||||
| -rw-r--r-- | go/topology/topology_map.go | 50 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 116 | ||||
| -rw-r--r-- | go/topology/volume_location.go | 58 |
15 files changed, 1287 insertions, 0 deletions
diff --git a/go/topology/configuration.go b/go/topology/configuration.go new file mode 100644 index 000000000..4c8424214 --- /dev/null +++ b/go/topology/configuration.go @@ -0,0 +1,56 @@ +package topology + +import ( + "encoding/xml" +) + +type loc struct { + dcName string + rackName string +} +type rack struct { + Name string `xml:"name,attr"` + Ips []string `xml:"Ip"` +} +type dataCenter struct { + Name string `xml:"name,attr"` + Racks []rack `xml:"Rack"` +} +type topology struct { + DataCenters []dataCenter `xml:"DataCenter"` +} +type Configuration struct { + XMLName xml.Name `xml:"Configuration"` + Topo topology `xml:"Topology"` + ip2location map[string]loc +} + +func NewConfiguration(b []byte) (*Configuration, error) { + c := &Configuration{} + err := xml.Unmarshal(b, c) + c.ip2location = make(map[string]loc) + for _, dc := range c.Topo.DataCenters { + for _, rack := range dc.Racks { + for _, ip := range rack.Ips { + c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} + } + } + } + return c, err +} + +func (c *Configuration) String() string { + if b, e := xml.MarshalIndent(c, " ", " "); e == nil { + return string(b) + } + return "" +} + +func (c *Configuration) Locate(ip string) (dc string, rack string) { + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName + } + } + return "DefaultDataCenter", "DefaultRack" +} diff --git a/go/topology/configuration_test.go b/go/topology/configuration_test.go new file mode 100644 index 000000000..35d82c058 --- /dev/null +++ b/go/topology/configuration_test.go @@ -0,0 +1,42 @@ +package topology + +import ( + "fmt" + "testing" +) + +func TestLoadConfiguration(t *testing.T) { + + confContent := ` + +<?xml version="1.0" encoding="UTF-8" ?> +<Configuration> + <Topology> + <DataCenter name="dc1"> + <Rack name="rack1"> + <Ip>192.168.1.1</Ip> + </Rack> + </DataCenter> + <DataCenter name="dc2"> + <Rack name="rack1"> + <Ip>192.168.1.2</Ip> + </Rack> + <Rack name="rack2"> + <Ip>192.168.1.3</Ip> + <Ip>192.168.1.4</Ip> + </Rack> + </DataCenter> + </Topology> +</Configuration> +` + c, err := NewConfiguration([]byte(confContent)) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%s", err.Error()) + } + + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { + t.Fatalf("unmarshal error:%s", c) + } +} diff --git a/go/topology/data_center.go b/go/topology/data_center.go new file mode 100644 index 000000000..a3b2b7d13 --- /dev/null +++ b/go/topology/data_center.go @@ -0,0 +1,41 @@ +package topology + +import () + +type DataCenter struct { + NodeImpl +} + +func NewDataCenter(id string) *DataCenter { + dc := &DataCenter{} + dc.id = NodeId(id) + dc.nodeType = "DataCenter" + dc.children = make(map[NodeId]Node) + dc.NodeImpl.value = dc + return dc +} + +func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack +} + +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m +} diff --git a/go/topology/data_node.go b/go/topology/data_node.go new file mode 100644 index 000000000..ba37f0d5f --- /dev/null +++ b/go/topology/data_node.go @@ -0,0 +1,60 @@ +package topology + +import ( + _ "fmt" + "code.google.com/p/weed-fs/go/storage" + "strconv" +) + +type DataNode struct { + NodeImpl + volumes map[storage.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + Dead bool +} + +func NewDataNode(id string) *DataNode { + s := &DataNode{} + s.id = NodeId(id) + s.nodeType = "DataNode" + s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.NodeImpl.value = s + return s +} +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { + if _, ok := dn.volumes[v.Id]; !ok { + dn.volumes[v.Id] = v + dn.UpAdjustVolumeCountDelta(1) + dn.UpAdjustActiveVolumeCountDelta(1) + dn.UpAdjustMaxVolumeId(v.Id) + } else { + dn.volumes[v.Id] = v + } +} +func (dn *DataNode) GetTopology() *Topology { + p := dn.parent + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t +} +func (dn *DataNode) MatchLocation(ip string, port int) bool { + return dn.Ip == ip && dn.Port == port +} +func (dn *DataNode) Url() string { + return dn.Ip + ":" + strconv.Itoa(dn.Port) +} + +func (dn *DataNode) ToMap() interface{} { + ret := make(map[string]interface{}) + ret["Url"] = dn.Url() + ret["Volumes"] = dn.GetVolumeCount() + ret["Max"] = dn.GetMaxVolumeCount() + ret["Free"] = dn.FreeSpace() + ret["PublicUrl"] = dn.PublicUrl + return ret +} diff --git a/go/topology/node.go b/go/topology/node.go new file mode 100644 index 000000000..90826dfae --- /dev/null +++ b/go/topology/node.go @@ -0,0 +1,200 @@ +package topology + +import ( + "fmt" + "code.google.com/p/weed-fs/go/storage" +) + +type NodeId string +type Node interface { + Id() NodeId + String() string + FreeSpace() int + ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) + UpAdjustVolumeCountDelta(volumeCountDelta int) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + UpAdjustMaxVolumeId(vid storage.VolumeId) + + GetVolumeCount() int + GetActiveVolumeCount() int + GetMaxVolumeCount() int + GetMaxVolumeId() storage.VolumeId + SetParent(Node) + LinkChildNode(node Node) + UnlinkChildNode(nodeId NodeId) + CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) + + IsDataNode() bool + Children() map[NodeId]Node + Parent() Node + + GetValue() interface{} //get reference to the topology,dc,rack,datanode +} +type NodeImpl struct { + id NodeId + volumeCount int + activeVolumeCount int + maxVolumeCount int + parent Node + children map[NodeId]Node + maxVolumeId storage.VolumeId + + //for rack, data center, topology + nodeType string + value interface{} +} + +func (n *NodeImpl) IsDataNode() bool { + return n.nodeType == "DataNode" +} +func (n *NodeImpl) IsRack() bool { + return n.nodeType == "Rack" +} +func (n *NodeImpl) IsDataCenter() bool { + return n.nodeType == "DataCenter" +} +func (n *NodeImpl) String() string { + if n.parent != nil { + return n.parent.String() + ":" + string(n.id) + } + return string(n.id) +} +func (n *NodeImpl) Id() NodeId { + return n.id +} +func (n *NodeImpl) FreeSpace() int { + return n.maxVolumeCount - n.volumeCount +} +func (n *NodeImpl) SetParent(node Node) { + n.parent = node +} +func (n *NodeImpl) Children() map[NodeId]Node { + return n.children +} +func (n *NodeImpl) Parent() Node { + return n.parent +} +func (n *NodeImpl) GetValue() interface{} { + return n.value +} +func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { + ret := false + var assignedNode *DataNode + for _, node := range n.children { + freeSpace := node.FreeSpace() + //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + if freeSpace <= 0 { + continue + } + if r >= freeSpace { + r -= freeSpace + } else { + if node.IsDataNode() && node.FreeSpace() > 0 { + //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + return true, node.(*DataNode) + } + ret, assignedNode = node.ReserveOneVolume(r, vid) + if ret { + break + } + } + } + return ret, assignedNode +} + +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative + n.maxVolumeCount += maxVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative + n.volumeCount += volumeCountDelta + if n.parent != nil { + n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative + n.activeVolumeCount += activeVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative + if n.maxVolumeId < vid { + n.maxVolumeId = vid + if n.parent != nil { + n.parent.UpAdjustMaxVolumeId(vid) + } + } +} +func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { + return n.maxVolumeId +} +func (n *NodeImpl) GetVolumeCount() int { + return n.volumeCount +} +func (n *NodeImpl) GetActiveVolumeCount() int { + return n.activeVolumeCount +} +func (n *NodeImpl) GetMaxVolumeCount() int { + return n.maxVolumeCount +} + +func (n *NodeImpl) LinkChildNode(node Node) { + if n.children[node.Id()] == nil { + n.children[node.Id()] = node + n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) + n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) + node.SetParent(n) + fmt.Println(n, "adds child", node.Id()) + } +} + +func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { + node := n.children[nodeId] + node.SetParent(nil) + if node != nil { + delete(n.children, node.Id()) + n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) + n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) + } +} + +func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { + if n.IsRack() { + for _, c := range n.Children() { + dn := c.(*DataNode) //can not cast n to DataNode + if dn.LastSeen < freshThreshHold { + if !dn.Dead { + dn.Dead = true + n.GetTopology().chanDeadDataNodes <- dn + } + } + for _, v := range dn.volumes { + if uint64(v.Size) >= volumeSizeLimit { + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) + n.GetTopology().chanFullVolumes <- v + } + } + } + } else { + for _, c := range n.Children() { + c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) + } + } +} + +func (n *NodeImpl) GetTopology() *Topology { + var p Node + p = n + for p.Parent() != nil { + p = p.Parent() + } + return p.GetValue().(*Topology) +} diff --git a/go/topology/node_list.go b/go/topology/node_list.go new file mode 100644 index 000000000..293f534ea --- /dev/null +++ b/go/topology/node_list.go @@ -0,0 +1,69 @@ +package topology + +import ( + "fmt" + "math/rand" + "code.google.com/p/weed-fs/go/storage" +) + +type NodeList struct { + nodes map[NodeId]Node + except map[string]Node +} + +func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { + m := make(map[NodeId]Node, len(nodes)-len(except)) + for _, n := range nodes { + if except[n.String()] == nil { + m[n.Id()] = n + } + } + nl := &NodeList{nodes: m} + return nl +} + +func (nl *NodeList) FreeSpace() int { + freeSpace := 0 + for _, n := range nl.nodes { + freeSpace += n.FreeSpace() + } + return freeSpace +} + +func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { + var list []Node + for _, n := range nl.nodes { + if n.FreeSpace() >= min { + list = append(list, n) + } + } + if n > len(list) { + return nil, false + } + for i := n; i > 0; i-- { + r := rand.Intn(i) + t := list[r] + list[r] = list[i-1] + list[i-1] = t + } + return list[len(list)-n:], true +} + +func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { + for _, node := range nl.nodes { + freeSpace := node.FreeSpace() + if randomVolumeIndex >= freeSpace { + randomVolumeIndex -= freeSpace + } else { + if node.IsDataNode() && node.FreeSpace() > 0 { + fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + return true, node.(*DataNode) + } + children := node.Children() + newNodeList := NewNodeList(children, nl.except) + return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) + } + } + return false, nil + +} diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go new file mode 100644 index 000000000..2fb4fa970 --- /dev/null +++ b/go/topology/node_list_test.go @@ -0,0 +1,39 @@ +package topology + +import ( + _ "fmt" + "strconv" + "testing" +) + +func TestXYZ(t *testing.T) { + topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) + for i := 0; i < 5; i++ { + dc := NewDataCenter("dc" + strconv.Itoa(i)) + dc.activeVolumeCount = i + dc.maxVolumeCount = 5 + topo.LinkChildNode(dc) + } + nl := NewNodeList(topo.Children(), nil) + + picked, ret := nl.RandomlyPickN(1) + if !ret || len(picked) != 1 { + t.Errorf("need to randomly pick 1 node") + } + + picked, ret = nl.RandomlyPickN(4) + if !ret || len(picked) != 4 { + t.Errorf("need to randomly pick 4 nodes") + } + + picked, ret = nl.RandomlyPickN(5) + if !ret || len(picked) != 5 { + t.Errorf("need to randomly pick 5 nodes") + } + + picked, ret = nl.RandomlyPickN(6) + if ret || len(picked) != 0 { + t.Errorf("can not randomly pick 6 nodes:", ret, picked) + } + +} diff --git a/go/topology/rack.go b/go/topology/rack.go new file mode 100644 index 000000000..acc34417a --- /dev/null +++ b/go/topology/rack.go @@ -0,0 +1,64 @@ +package topology + +import ( + "strconv" + "time" +) + +type Rack struct { + NodeImpl +} + +func NewRack(id string) *Rack { + r := &Rack{} + r.id = NodeId(id) + r.nodeType = "Rack" + r.children = make(map[NodeId]Node) + r.NodeImpl.value = r + return r +} + +func (r *Rack) FindDataNode(ip string, port int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil +} +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + dn.LastSeen = time.Now().Unix() + if dn.Dead { + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) + } + return dn + } + } + dn := NewDataNode(ip + ":" + strconv.Itoa(port)) + dn.Ip = ip + dn.Port = port + dn.PublicUrl = publicUrl + dn.maxVolumeCount = maxVolumeCount + dn.LastSeen = time.Now().Unix() + r.LinkChildNode(dn) + return dn +} + +func (rack *Rack) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = rack.GetMaxVolumeCount() + m["Free"] = rack.FreeSpace() + var dns []interface{} + for _, c := range rack.Children() { + dn := c.(*DataNode) + dns = append(dns, dn.ToMap()) + } + m["DataNodes"] = dns + return m +} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go new file mode 100644 index 000000000..f8af79b21 --- /dev/null +++ b/go/topology/topo_test.go @@ -0,0 +1,127 @@ +package topology + +import ( + "encoding/json" + "fmt" + "math/rand" + "code.google.com/p/weed-fs/go/storage" + "testing" + "time" +) + +var topologyLayout = ` +{ + "dc1":{ + "rack1":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server2":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server1":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server2":{ + "volumes":[], + "limit":4 + }, + "server3":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":2 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server1":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } +} +` + +func setup(topologyLayout string) *Topology { + var data interface{} + err := json.Unmarshal([]byte(topologyLayout), &data) + if err != nil { + fmt.Println("error:", err) + } + + //need to connect all nodes first before server adding volumes + topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) + mTopology := data.(map[string]interface{}) + for dcKey, dcValue := range mTopology { + dc := NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + rack := NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(rack) + for serverKey, serverValue := range rackMap { + server := NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + rack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + } + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + } + } + } + + return topo +} + +func TestRemoveDataCenter(t *testing.T) { + topo := setup(topologyLayout) + topo.UnlinkChildNode(NodeId("dc2")) + if topo.GetActiveVolumeCount() != 15 { + t.Fail() + } + topo.UnlinkChildNode(NodeId("dc3")) + if topo.GetActiveVolumeCount() != 12 { + t.Fail() + } +} + +func TestReserveOneVolume(t *testing.T) { + topo := setup(topologyLayout) + rand.Seed(time.Now().UnixNano()) + rand.Seed(1) + ret, node, vid := topo.RandomlyReserveOneVolume() + fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) + +} diff --git a/go/topology/topology.go b/go/topology/topology.go new file mode 100644 index 000000000..70a1ad268 --- /dev/null +++ b/go/topology/topology.go @@ -0,0 +1,148 @@ +package topology + +import ( + "errors" + "io/ioutil" + "math/rand" + "code.google.com/p/weed-fs/go/directory" + "code.google.com/p/weed-fs/go/sequence" + "code.google.com/p/weed-fs/go/storage" +) + +type Topology struct { + NodeImpl + + //transient vid~servers mapping for each replication type + replicaType2VolumeLayout []*VolumeLayout + + pulse int64 + + volumeSizeLimit uint64 + + sequence sequence.Sequencer + + chanDeadDataNodes chan *DataNode + chanRecoveredDataNodes chan *DataNode + chanFullVolumes chan storage.VolumeInfo + + configuration *Configuration +} + +func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology { + t := &Topology{} + t.id = NodeId(id) + t.nodeType = "Topology" + t.NodeImpl.value = t + t.children = make(map[NodeId]Node) + t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + t.pulse = int64(pulse) + t.volumeSizeLimit = volumeSizeLimit + + t.sequence = sequence.NewSequencer(dirname, sequenceFilename) + + t.chanDeadDataNodes = make(chan *DataNode) + t.chanRecoveredDataNodes = make(chan *DataNode) + t.chanFullVolumes = make(chan storage.VolumeInfo) + + t.loadConfiguration(confFile) + + return t +} + +func (t *Topology) loadConfiguration(configurationFile string) error { + b, e := ioutil.ReadFile(configurationFile) + if e == nil { + t.configuration, e = NewConfiguration(b) + } + return e +} + +func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { + for _, vl := range t.replicaType2VolumeLayout { + if vl != nil { + if list := vl.Lookup(vid); list != nil { + return list + } + } + } + return nil +} + +func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { + if t.FreeSpace() <= 0 { + return false, nil, nil + } + vid := t.NextVolumeId() + ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) + return ret, node, &vid +} + +func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { + freeSpace := t.FreeSpace() + for _, node := range except { + freeSpace -= node.FreeSpace() + } + if freeSpace <= 0 { + return false, nil, nil + } + vid := t.NextVolumeId() + ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) + return ret, node, &vid +} + +func (t *Topology) NextVolumeId() storage.VolumeId { + vid := t.GetMaxVolumeId() + return vid.Next() +} + +func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) + if err != nil { + return "", 0, nil, errors.New("No writable volumes avalable!") + } + fileId, count := t.sequence.NextFileId(count) + return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil +} + +func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { + replicationTypeIndex := repType.GetReplicationLevelIndex() + if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) + } + return t.replicaType2VolumeLayout[replicationTypeIndex] +} + +func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { + t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) +} + +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { + dcName, rackName := t.configuration.Locate(ip) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn := rack.FindDataNode(ip, port) + if init && dn != nil { + t.UnRegisterDataNode(dn) + } + dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) + for _, v := range volumeInfos { + dn.AddOrUpdateVolume(v) + t.RegisterVolumeLayout(&v, dn) + } +} + +func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { + for _, c := range t.Children() { + dc := c.(*DataCenter) + if string(dc.Id()) == dcName { + return dc + } + } + dc := NewDataCenter(dcName) + t.LinkChildNode(dc) + return dc +} diff --git a/go/topology/topology_compact.go b/go/topology/topology_compact.go new file mode 100644 index 000000000..e25e394d4 --- /dev/null +++ b/go/topology/topology_compact.go @@ -0,0 +1,150 @@ +package topology + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" + "time" +) + +func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) + if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { + //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) + ch <- false + } else { + //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) + ch <- ret + } + }(index, dn.Url(), vid) + } + isCheckSuccess := true + for _ = range locationlist.list { + select { + case canVacuum := <-ch: + isCheckSuccess = isCheckSuccess && canVacuum + case <-time.After(30 * time.Minute): + isCheckSuccess = false + break + } + } + return isCheckSuccess +} +func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + vl.removeFromWritable(vid) + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + fmt.Println(index, "Start vacuuming", vid, "on", dn.Url()) + if e := vacuumVolume_Compact(url, vid); e != nil { + fmt.Println(index, "Error when vacuuming", vid, "on", url, e) + ch <- false + } else { + fmt.Println(index, "Complete vacuuming", vid, "on", url) + ch <- true + } + }(index, dn.Url(), vid) + } + isVacuumSuccess := true + for _ = range locationlist.list { + select { + case _ = <-ch: + case <-time.After(30 * time.Minute): + isVacuumSuccess = false + break + } + } + return isVacuumSuccess +} +func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + isCommitSuccess := true + for _, dn := range locationlist.list { + fmt.Println("Start Commiting vacuum", vid, "on", dn.Url()) + if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { + fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e) + isCommitSuccess = false + } else { + fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url()) + } + } + if isCommitSuccess { + vl.setVolumeWritable(vid) + } + return isCommitSuccess +} +func (t *Topology) Vacuum(garbageThreshold string) int { + for _, vl := range t.replicaType2VolumeLayout { + if vl != nil { + for vid, locationlist := range vl.vid2location { + if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(vl, vid, locationlist) { + batchVacuumVolumeCommit(vl, vid, locationlist) + } + } + } + } + } + return 0 +} + +type VacuumVolumeResult struct { + Result bool + Error string +} + +func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("garbageThreshold", garbageThreshold) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) + if err != nil { + fmt.Println("parameters:", values) + return err, false + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err, false + } + if ret.Error != "" { + return errors.New(ret.Error), false + } + return nil, ret.Result +} +func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} +func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go new file mode 100644 index 000000000..9093bf884 --- /dev/null +++ b/go/topology/topology_event_handling.go @@ -0,0 +1,67 @@ +package topology + +import ( + "fmt" + "math/rand" + "code.google.com/p/weed-fs/go/storage" + "time" +) + +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { + go func() { + for { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func(garbageThreshold string) { + c := time.Tick(15 * time.Minute) + for _ = range c { + t.Vacuum(garbageThreshold) + } + }(garbageThreshold) + go func() { + for { + select { + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + fmt.Println("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + fmt.Println("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { + vl := t.GetVolumeLayout(volumeInfo.RepType) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } + for _, dn := range vl.vid2location[volumeInfo.Id].list { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + return true +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.volumes { + fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.RepType) + vl.SetVolumeUnavailable(dn, v.Id) + } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.volumes { + vl := t.GetVolumeLayout(v.RepType) + if vl.isWritable(&v) { + vl.SetVolumeAvailable(dn, v.Id) + } + } +} diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go new file mode 100644 index 000000000..b416ee943 --- /dev/null +++ b/go/topology/topology_map.go @@ -0,0 +1,50 @@ +package topology + +import () + +func (t *Topology) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + var dcs []interface{} + for _, c := range t.Children() { + dc := c.(*DataCenter) + dcs = append(dcs, dc.ToMap()) + } + m["DataCenters"] = dcs + var layouts []interface{} + for _, layout := range t.replicaType2VolumeLayout { + if layout != nil { + layouts = append(layouts, layout.ToMap()) + } + } + m["layouts"] = layouts + return m +} + +func (t *Topology) ToVolumeMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + dcs := make(map[NodeId]interface{}) + for _, c := range t.Children() { + dc := c.(*DataCenter) + racks := make(map[NodeId]interface{}) + for _, r := range dc.Children() { + rack := r.(*Rack) + dataNodes := make(map[NodeId]interface{}) + for _, d := range rack.Children() { + dn := d.(*DataNode) + var volumes []interface{} + for _, v := range dn.volumes { + volumes = append(volumes, v) + } + dataNodes[d.Id()] = volumes + } + racks[r.Id()] = dataNodes + } + dcs[dc.Id()] = racks + } + m["DataCenters"] = dcs + return m +} diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go new file mode 100644 index 000000000..b6e6e8bfe --- /dev/null +++ b/go/topology/volume_layout.go @@ -0,0 +1,116 @@ +package topology + +import ( + "errors" + "fmt" + "math/rand" + "code.google.com/p/weed-fs/go/storage" +) + +type VolumeLayout struct { + repType storage.ReplicationType + vid2location map[storage.VolumeId]*VolumeLocationList + writables []storage.VolumeId // transient array of writable volume id + pulse int64 + volumeSizeLimit uint64 +} + +func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { + return &VolumeLayout{ + repType: repType, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + writables: *new([]storage.VolumeId), + pulse: pulse, + volumeSizeLimit: volumeSizeLimit, + } +} + +func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + if _, ok := vl.vid2location[v.Id]; !ok { + vl.vid2location[v.Id] = NewVolumeLocationList() + } + if vl.vid2location[v.Id].Add(dn) { + if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { + if vl.isWritable(v) { + vl.writables = append(vl.writables, v.Id) + } + } + } +} + +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion +} + +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { + return vl.vid2location[vid].list +} + +func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { + len_writers := len(vl.writables) + if len_writers <= 0 { + fmt.Println("No more writable volumes!") + return nil, 0, nil, errors.New("No more writable volumes!") + } + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") +} + +func (vl *VolumeLayout) GetActiveVolumeCount() int { + return len(vl.writables) +} + +func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { + for i, v := range vl.writables { + if v == vid { + fmt.Println("Volume", vid, "becomes unwritable") + vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) + return true + } + } + return false +} +func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables { + if v == vid { + return false + } + } + fmt.Println("Volume", vid, "becomes writable") + vl.writables = append(vl.writables, vid) + return true +} + +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Remove(dn) { + if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { + fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) + return vl.removeFromWritable(vid) + } + } + return false +} +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { + if vl.vid2location[vid].Add(dn) { + if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + return vl.setVolumeWritable(vid) + } + } + return false +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { + return vl.removeFromWritable(vid) +} + +func (vl *VolumeLayout) ToMap() interface{} { + m := make(map[string]interface{}) + m["replication"] = vl.repType.String() + m["writables"] = vl.writables + //m["locations"] = vl.vid2location + return m +} diff --git a/go/topology/volume_location.go b/go/topology/volume_location.go new file mode 100644 index 000000000..507a240b5 --- /dev/null +++ b/go/topology/volume_location.go @@ -0,0 +1,58 @@ +package topology + +import () + +type VolumeLocationList struct { + list []*DataNode +} + +func NewVolumeLocationList() *VolumeLocationList { + return &VolumeLocationList{} +} + +func (dnll *VolumeLocationList) Head() *DataNode { + return dnll.list[0] +} + +func (dnll *VolumeLocationList) Length() int { + return len(dnll.list) +} + +func (dnll *VolumeLocationList) Add(loc *DataNode) bool { + for _, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + return false + } + } + dnll.list = append(dnll.list, loc) + return true +} + +func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false +} + +func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { + var changed bool + for _, dnl := range dnll.list { + if dnl.LastSeen < freshThreshHold { + changed = true + break + } + } + if changed { + var l []*DataNode + for _, dnl := range dnll.list { + if dnl.LastSeen >= freshThreshHold { + l = append(l, dnl) + } + } + dnll.list = l + } +} |
