diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-02-10 03:49:51 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-02-10 03:49:51 -0800 |
| commit | 5071f528f649f3f99336c7d491ceef4859e34744 (patch) | |
| tree | 0c4bc8a286597cd79e22b1ce02cd9cd3b1c44602 /weed/topology | |
| parent | 55f2627fcf965c3765ad9b63878e9a22a59f4b55 (diff) | |
| download | seaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.tar.xz seaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.zip | |
testing compilation with remove package
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/configuration.go | 56 | ||||
| -rw-r--r-- | weed/topology/configuration_test.go | 42 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 41 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 60 | ||||
| -rw-r--r-- | weed/topology/node.go | 200 | ||||
| -rw-r--r-- | weed/topology/node_list.go | 69 | ||||
| -rw-r--r-- | weed/topology/node_list_test.go | 39 | ||||
| -rw-r--r-- | weed/topology/rack.go | 64 | ||||
| -rw-r--r-- | weed/topology/topo_test.go | 127 | ||||
| -rw-r--r-- | weed/topology/topology.go | 148 | ||||
| -rw-r--r-- | weed/topology/topology_compact.go | 150 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 67 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 50 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 116 | ||||
| -rw-r--r-- | weed/topology/volume_location.go | 58 |
15 files changed, 0 insertions, 1287 deletions
diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go deleted file mode 100644 index 4c8424214..000000000 --- a/weed/topology/configuration.go +++ /dev/null @@ -1,56 +0,0 @@ -package topology - -import ( - "encoding/xml" -) - -type loc struct { - dcName string - rackName string -} -type rack struct { - Name string `xml:"name,attr"` - Ips []string `xml:"Ip"` -} -type dataCenter struct { - Name string `xml:"name,attr"` - Racks []rack `xml:"Rack"` -} -type topology struct { - DataCenters []dataCenter `xml:"DataCenter"` -} -type Configuration struct { - XMLName xml.Name `xml:"Configuration"` - Topo topology `xml:"Topology"` - ip2location map[string]loc -} - -func NewConfiguration(b []byte) (*Configuration, error) { - c := &Configuration{} - err := xml.Unmarshal(b, c) - c.ip2location = make(map[string]loc) - for _, dc := range c.Topo.DataCenters { - for _, rack := range dc.Racks { - for _, ip := range rack.Ips { - c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} - } - } - } - return c, err -} - -func (c *Configuration) String() string { - if b, e := xml.MarshalIndent(c, " ", " "); e == nil { - return string(b) - } - return "" -} - -func (c *Configuration) Locate(ip string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - return "DefaultDataCenter", "DefaultRack" -} diff --git a/weed/topology/configuration_test.go b/weed/topology/configuration_test.go deleted file mode 100644 index 35d82c058..000000000 --- a/weed/topology/configuration_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package topology - -import ( - "fmt" - "testing" -) - -func TestLoadConfiguration(t *testing.T) { - - confContent := ` - -<?xml version="1.0" encoding="UTF-8" ?> -<Configuration> - <Topology> - <DataCenter name="dc1"> - <Rack name="rack1"> - <Ip>192.168.1.1</Ip> - </Rack> - </DataCenter> - <DataCenter name="dc2"> - <Rack name="rack1"> - <Ip>192.168.1.2</Ip> - </Rack> - <Rack name="rack2"> - <Ip>192.168.1.3</Ip> - <Ip>192.168.1.4</Ip> - </Rack> - </DataCenter> - </Topology> -</Configuration> -` - c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err != nil { - t.Fatalf("unmarshal error:%s", err.Error()) - } - - if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s", c) - } -} diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go deleted file mode 100644 index a3b2b7d13..000000000 --- a/weed/topology/data_center.go +++ /dev/null @@ -1,41 +0,0 @@ -package topology - -import () - -type DataCenter struct { - NodeImpl -} - -func NewDataCenter(id string) *DataCenter { - dc := &DataCenter{} - dc.id = NodeId(id) - dc.nodeType = "DataCenter" - dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc - return dc -} - -func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack -} - -func (dc *DataCenter) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m -} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go deleted file mode 100644 index dbb634af2..000000000 --- a/weed/topology/data_node.go +++ /dev/null @@ -1,60 +0,0 @@ -package topology - -import ( - _ "fmt" - "code.google.com/p/weed-fs/weed/storage" - "strconv" -) - -type DataNode struct { - NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - Dead bool -} - -func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) - s.NodeImpl.value = s - return s -} -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { - if _, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - dn.UpAdjustActiveVolumeCountDelta(1) - dn.UpAdjustMaxVolumeId(v.Id) - } else { - dn.volumes[v.Id] = v - } -} -func (dn *DataNode) GetTopology() *Topology { - p := dn.parent - for p.Parent() != nil { - p = p.Parent() - } - t := p.(*Topology) - return t -} -func (dn *DataNode) MatchLocation(ip string, port int) bool { - return dn.Ip == ip && dn.Port == port -} -func (dn *DataNode) Url() string { - return dn.Ip + ":" + strconv.Itoa(dn.Port) -} - -func (dn *DataNode) ToMap() interface{} { - ret := make(map[string]interface{}) - ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() - ret["PublicUrl"] = dn.PublicUrl - return ret -} diff --git a/weed/topology/node.go b/weed/topology/node.go deleted file mode 100644 index fe69c57c0..000000000 --- a/weed/topology/node.go +++ /dev/null @@ -1,200 +0,0 @@ -package topology - -import ( - "fmt" - "code.google.com/p/weed-fs/weed/storage" -) - -type NodeId string -type Node interface { - Id() NodeId - String() string - FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) - - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId - SetParent(Node) - LinkChildNode(node Node) - UnlinkChildNode(nodeId NodeId) - CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) - - IsDataNode() bool - Children() map[NodeId]Node - Parent() Node - - GetValue() interface{} //get reference to the topology,dc,rack,datanode -} -type NodeImpl struct { - id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int - parent Node - children map[NodeId]Node - maxVolumeId storage.VolumeId - - //for rack, data center, topology - nodeType string - value interface{} -} - -func (n *NodeImpl) IsDataNode() bool { - return n.nodeType == "DataNode" -} -func (n *NodeImpl) IsRack() bool { - return n.nodeType == "Rack" -} -func (n *NodeImpl) IsDataCenter() bool { - return n.nodeType == "DataCenter" -} -func (n *NodeImpl) String() string { - if n.parent != nil { - return n.parent.String() + ":" + string(n.id) - } - return string(n.id) -} -func (n *NodeImpl) Id() NodeId { - return n.id -} -func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.volumeCount -} -func (n *NodeImpl) SetParent(node Node) { - n.parent = node -} -func (n *NodeImpl) Children() map[NodeId]Node { - return n.children -} -func (n *NodeImpl) Parent() Node { - return n.parent -} -func (n *NodeImpl) GetValue() interface{} { - return n.value -} -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { - ret := false - var assignedNode *DataNode - for _, node := range n.children { - freeSpace := node.FreeSpace() - //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) - if freeSpace <= 0 { - continue - } - if r >= freeSpace { - r -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - ret, assignedNode = node.ReserveOneVolume(r, vid) - if ret { - break - } - } - } - return ret, assignedNode -} - -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta - if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative - if n.maxVolumeId < vid { - n.maxVolumeId = vid - if n.parent != nil { - n.parent.UpAdjustMaxVolumeId(vid) - } - } -} -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { - return n.maxVolumeId -} -func (n *NodeImpl) GetVolumeCount() int { - return n.volumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int { - return n.maxVolumeCount -} - -func (n *NodeImpl) LinkChildNode(node Node) { - if n.children[node.Id()] == nil { - n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) - n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) - node.SetParent(n) - fmt.Println(n, "adds child", node.Id()) - } -} - -func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { - node := n.children[nodeId] - node.SetParent(nil) - if node != nil { - delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) - fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount) - } -} - -func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { - if n.IsRack() { - for _, c := range n.Children() { - dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true - n.GetTopology().chanDeadDataNodes <- dn - } - } - for _, v := range dn.volumes { - if uint64(v.Size) >= volumeSizeLimit { - //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- v - } - } - } - } else { - for _, c := range n.Children() { - c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) - } - } -} - -func (n *NodeImpl) GetTopology() *Topology { - var p Node - p = n - for p.Parent() != nil { - p = p.Parent() - } - return p.GetValue().(*Topology) -} diff --git a/weed/topology/node_list.go b/weed/topology/node_list.go deleted file mode 100644 index 597d39b93..000000000 --- a/weed/topology/node_list.go +++ /dev/null @@ -1,69 +0,0 @@ -package topology - -import ( - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" -) - -type NodeList struct { - nodes map[NodeId]Node - except map[string]Node -} - -func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { - m := make(map[NodeId]Node, len(nodes)-len(except)) - for _, n := range nodes { - if except[n.String()] == nil { - m[n.Id()] = n - } - } - nl := &NodeList{nodes: m} - return nl -} - -func (nl *NodeList) FreeSpace() int { - freeSpace := 0 - for _, n := range nl.nodes { - freeSpace += n.FreeSpace() - } - return freeSpace -} - -func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { - var list []Node - for _, n := range nl.nodes { - if n.FreeSpace() >= min { - list = append(list, n) - } - } - if n > len(list) { - return nil, false - } - for i := n; i > 0; i-- { - r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t - } - return list[len(list)-n:], true -} - -func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { - for _, node := range nl.nodes { - freeSpace := node.FreeSpace() - if randomVolumeIndex >= freeSpace { - randomVolumeIndex -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - children := node.Children() - newNodeList := NewNodeList(children, nl.except) - return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) - } - } - return false, nil - -} diff --git a/weed/topology/node_list_test.go b/weed/topology/node_list_test.go deleted file mode 100644 index 2fb4fa970..000000000 --- a/weed/topology/node_list_test.go +++ /dev/null @@ -1,39 +0,0 @@ -package topology - -import ( - _ "fmt" - "strconv" - "testing" -) - -func TestXYZ(t *testing.T) { - topo := NewTopology("topo", "/etc/weed.conf", "/tmp", "test", 234, 5) - for i := 0; i < 5; i++ { - dc := NewDataCenter("dc" + strconv.Itoa(i)) - dc.activeVolumeCount = i - dc.maxVolumeCount = 5 - topo.LinkChildNode(dc) - } - nl := NewNodeList(topo.Children(), nil) - - picked, ret := nl.RandomlyPickN(1) - if !ret || len(picked) != 1 { - t.Errorf("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(4) - if !ret || len(picked) != 4 { - t.Errorf("need to randomly pick 4 nodes") - } - - picked, ret = nl.RandomlyPickN(5) - if !ret || len(picked) != 5 { - t.Errorf("need to randomly pick 5 nodes") - } - - picked, ret = nl.RandomlyPickN(6) - if ret || len(picked) != 0 { - t.Errorf("can not randomly pick 6 nodes:", ret, picked) - } - -} diff --git a/weed/topology/rack.go b/weed/topology/rack.go deleted file mode 100644 index acc34417a..000000000 --- a/weed/topology/rack.go +++ /dev/null @@ -1,64 +0,0 @@ -package topology - -import ( - "strconv" - "time" -) - -type Rack struct { - NodeImpl -} - -func NewRack(id string) *Rack { - r := &Rack{} - r.id = NodeId(id) - r.nodeType = "Rack" - r.children = make(map[NodeId]Node) - r.NodeImpl.value = r - return r -} - -func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil -} -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } - return dn - } - } - dn := NewDataNode(ip + ":" + strconv.Itoa(port)) - dn.Ip = ip - dn.Port = port - dn.PublicUrl = publicUrl - dn.maxVolumeCount = maxVolumeCount - dn.LastSeen = time.Now().Unix() - r.LinkChildNode(dn) - return dn -} - -func (rack *Rack) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = rack.GetMaxVolumeCount() - m["Free"] = rack.FreeSpace() - var dns []interface{} - for _, c := range rack.Children() { - dn := c.(*DataNode) - dns = append(dns, dn.ToMap()) - } - m["DataNodes"] = dns - return m -} diff --git a/weed/topology/topo_test.go b/weed/topology/topo_test.go deleted file mode 100644 index 99db15b5c..000000000 --- a/weed/topology/topo_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package topology - -import ( - "encoding/json" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" - "testing" - "time" -) - -var topologyLayout = ` -{ - "dc1":{ - "rack1":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server2":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server1":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server2":{ - "volumes":[], - "limit":4 - }, - "server3":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":2 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server1":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } -} -` - -func setup(topologyLayout string) *Topology { - var data interface{} - err := json.Unmarshal([]byte(topologyLayout), &data) - if err != nil { - fmt.Println("error:", err) - } - - //need to connect all nodes first before server adding volumes - topo := NewTopology("mynetwork", "/etc/weed.conf", "/tmp", "test", 234, 5) - mTopology := data.(map[string]interface{}) - for dcKey, dcValue := range mTopology { - dc := NewDataCenter(dcKey) - dcMap := dcValue.(map[string]interface{}) - topo.LinkChildNode(dc) - for rackKey, rackValue := range dcMap { - rack := NewRack(rackKey) - rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) - for serverKey, serverValue := range rackMap { - server := NewDataNode(serverKey) - serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) - for _, v := range serverMap["volumes"].([]interface{}) { - m := v.(map[string]interface{}) - vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) - } - } - } - - return topo -} - -func TestRemoveDataCenter(t *testing.T) { - topo := setup(topologyLayout) - topo.UnlinkChildNode(NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { - t.Fail() - } - topo.UnlinkChildNode(NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { - t.Fail() - } -} - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume() - fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) - -} diff --git a/weed/topology/topology.go b/weed/topology/topology.go deleted file mode 100644 index ac5505a66..000000000 --- a/weed/topology/topology.go +++ /dev/null @@ -1,148 +0,0 @@ -package topology - -import ( - "errors" - "io/ioutil" - "math/rand" - "code.google.com/p/weed-fs/weed/directory" - "code.google.com/p/weed-fs/weed/sequence" - "code.google.com/p/weed-fs/weed/storage" -) - -type Topology struct { - NodeImpl - - //transient vid~servers mapping for each replication type - replicaType2VolumeLayout []*VolumeLayout - - pulse int64 - - volumeSizeLimit uint64 - - sequence sequence.Sequencer - - chanDeadDataNodes chan *DataNode - chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan storage.VolumeInfo - - configuration *Configuration -} - -func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology { - t := &Topology{} - t.id = NodeId(id) - t.nodeType = "Topology" - t.NodeImpl.value = t - t.children = make(map[NodeId]Node) - t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) - t.pulse = int64(pulse) - t.volumeSizeLimit = volumeSizeLimit - - t.sequence = sequence.NewSequencer(dirname, sequenceFilename) - - t.chanDeadDataNodes = make(chan *DataNode) - t.chanRecoveredDataNodes = make(chan *DataNode) - t.chanFullVolumes = make(chan storage.VolumeInfo) - - t.loadConfiguration(confFile) - - return t -} - -func (t *Topology) loadConfiguration(configurationFile string) error { - b, e := ioutil.ReadFile(configurationFile) - if e == nil { - t.configuration, e = NewConfiguration(b) - } - return e -} - -func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - if list := vl.Lookup(vid); list != nil { - return list - } - } - } - return nil -} - -func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { - if t.FreeSpace() <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, &vid -} - -func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { - freeSpace := t.FreeSpace() - for _, node := range except { - freeSpace -= node.FreeSpace() - } - if freeSpace <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) - return ret, node, &vid -} - -func (t *Topology) NextVolumeId() storage.VolumeId { - vid := t.GetMaxVolumeId() - return vid.Next() -} - -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) - if err != nil { - return "", 0, nil, errors.New("No writable volumes avalable!") - } - fileId, count := t.sequence.NextFileId(count) - return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil -} - -func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { - t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) - } - return t.replicaType2VolumeLayout[replicationTypeIndex] -} - -func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) -} - -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { - dcName, rackName := t.configuration.Locate(ip) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(ip, port) - if init && dn != nil { - t.UnRegisterDataNode(dn) - } - dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount) - for _, v := range volumeInfos { - dn.AddOrUpdateVolume(v) - t.RegisterVolumeLayout(&v, dn) - } -} - -func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { - for _, c := range t.Children() { - dc := c.(*DataCenter) - if string(dc.Id()) == dcName { - return dc - } - } - dc := NewDataCenter(dcName) - t.LinkChildNode(dc) - return dc -} diff --git a/weed/topology/topology_compact.go b/weed/topology/topology_compact.go deleted file mode 100644 index 980f72a6e..000000000 --- a/weed/topology/topology_compact.go +++ /dev/null @@ -1,150 +0,0 @@ -package topology - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/util" - "time" -) - -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - //fmt.Println(index, "Check vacuuming", vid, "on", dn.Url()) - if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { - //fmt.Println(index, "Error when checking vacuuming", vid, "on", url, e) - ch <- false - } else { - //fmt.Println(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) - ch <- ret - } - }(index, dn.Url(), vid) - } - isCheckSuccess := true - for _ = range locationlist.list { - select { - case canVacuum := <-ch: - isCheckSuccess = isCheckSuccess && canVacuum - case <-time.After(30 * time.Minute): - isCheckSuccess = false - break - } - } - return isCheckSuccess -} -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - vl.removeFromWritable(vid) - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - fmt.Println(index, "Start vacuuming", vid, "on", dn.Url()) - if e := vacuumVolume_Compact(url, vid); e != nil { - fmt.Println(index, "Error when vacuuming", vid, "on", url, e) - ch <- false - } else { - fmt.Println(index, "Complete vacuuming", vid, "on", url) - ch <- true - } - }(index, dn.Url(), vid) - } - isVacuumSuccess := true - for _ = range locationlist.list { - select { - case _ = <-ch: - case <-time.After(30 * time.Minute): - isVacuumSuccess = false - break - } - } - return isVacuumSuccess -} -func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - isCommitSuccess := true - for _, dn := range locationlist.list { - fmt.Println("Start Commiting vacuum", vid, "on", dn.Url()) - if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { - fmt.Println("Error when committing vacuum", vid, "on", dn.Url(), e) - isCommitSuccess = false - } else { - fmt.Println("Complete Commiting vacuum", vid, "on", dn.Url()) - } - } - if isCommitSuccess { - vl.setVolumeWritable(vid) - } - return isCommitSuccess -} -func (t *Topology) Vacuum(garbageThreshold string) int { - for _, vl := range t.replicaType2VolumeLayout { - if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) - } - } - } - } - } - return 0 -} - -type VacuumVolumeResult struct { - Result bool - Error string -} - -func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values) - if err != nil { - fmt.Println("parameters:", values) - return err, false - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err, false - } - if ret.Error != "" { - return errors.New(ret.Error), false - } - return nil, ret.Result -} -func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} -func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go deleted file mode 100644 index 084dd5b57..000000000 --- a/weed/topology/topology_event_handling.go +++ /dev/null @@ -1,67 +0,0 @@ -package topology - -import ( - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" - "time" -) - -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { - go func() { - for { - freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval - t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) - time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - go func(garbageThreshold string) { - c := time.Tick(15 * time.Minute) - for _ = range c { - t.Vacuum(garbageThreshold) - } - }(garbageThreshold) - go func() { - for { - select { - case v := <-t.chanFullVolumes: - t.SetVolumeCapacityFull(v) - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - fmt.Println("DataNode", dn, "is back alive!") - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - fmt.Println("DataNode", dn, "is dead!") - } - } - }() -} -func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.RepType) - if !vl.SetVolumeCapacityFull(volumeInfo.Id) { - return false - } - for _, dn := range vl.vid2location[volumeInfo.Id].list { - dn.UpAdjustActiveVolumeCountDelta(-1) - } - return true -} -func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.volumes { - fmt.Println("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.RepType) - vl.SetVolumeUnavailable(dn, v.Id) - } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) - dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) - dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) - dn.Parent().UnlinkChildNode(dn.Id()) -} -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.RepType) - if vl.isWritable(&v) { - vl.SetVolumeAvailable(dn, v.Id) - } - } -} diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go deleted file mode 100644 index b416ee943..000000000 --- a/weed/topology/topology_map.go +++ /dev/null @@ -1,50 +0,0 @@ -package topology - -import () - -func (t *Topology) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - var dcs []interface{} - for _, c := range t.Children() { - dc := c.(*DataCenter) - dcs = append(dcs, dc.ToMap()) - } - m["DataCenters"] = dcs - var layouts []interface{} - for _, layout := range t.replicaType2VolumeLayout { - if layout != nil { - layouts = append(layouts, layout.ToMap()) - } - } - m["layouts"] = layouts - return m -} - -func (t *Topology) ToVolumeMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - dcs := make(map[NodeId]interface{}) - for _, c := range t.Children() { - dc := c.(*DataCenter) - racks := make(map[NodeId]interface{}) - for _, r := range dc.Children() { - rack := r.(*Rack) - dataNodes := make(map[NodeId]interface{}) - for _, d := range rack.Children() { - dn := d.(*DataNode) - var volumes []interface{} - for _, v := range dn.volumes { - volumes = append(volumes, v) - } - dataNodes[d.Id()] = volumes - } - racks[r.Id()] = dataNodes - } - dcs[dc.Id()] = racks - } - m["DataCenters"] = dcs - return m -} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go deleted file mode 100644 index 494630d9f..000000000 --- a/weed/topology/volume_layout.go +++ /dev/null @@ -1,116 +0,0 @@ -package topology - -import ( - "errors" - "fmt" - "math/rand" - "code.google.com/p/weed-fs/weed/storage" -) - -type VolumeLayout struct { - repType storage.ReplicationType - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - pulse int64 - volumeSizeLimit uint64 -} - -func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout { - return &VolumeLayout{ - repType: repType, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - pulse: pulse, - volumeSizeLimit: volumeSizeLimit, - } -} - -func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - if _, ok := vl.vid2location[v.Id]; !ok { - vl.vid2location[v.Id] = NewVolumeLocationList() - } - if vl.vid2location[v.Id].Add(dn) { - if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { - if vl.isWritable(v) { - vl.writables = append(vl.writables, v.Id) - } - } - } -} - -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && v.Version == storage.CurrentVersion -} - -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { - return vl.vid2location[vid].list -} - -func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { - len_writers := len(vl.writables) - if len_writers <= 0 { - fmt.Println("No more writable volumes!") - return nil, 0, nil, errors.New("No more writable volumes!") - } - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { - return &vid, count, locationList, nil - } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") -} - -func (vl *VolumeLayout) GetActiveVolumeCount() int { - return len(vl.writables) -} - -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { - for i, v := range vl.writables { - if v == vid { - fmt.Println("Volume", vid, "becomes unwritable") - vl.writables = append(vl.writables[:i], vl.writables[i+1:]...) - return true - } - } - return false -} -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { - for _, v := range vl.writables { - if v == vid { - return false - } - } - fmt.Println("Volume", vid, "becomes writable") - vl.writables = append(vl.writables, vid) - return true -} - -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { - fmt.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) - return vl.removeFromWritable(vid) - } - } - return false -} -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { - if vl.vid2location[vid].Add(dn) { - if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { - return vl.setVolumeWritable(vid) - } - } - return false -} - -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { - return vl.removeFromWritable(vid) -} - -func (vl *VolumeLayout) ToMap() interface{} { - m := make(map[string]interface{}) - m["replication"] = vl.repType.String() - m["writables"] = vl.writables - //m["locations"] = vl.vid2location - return m -} diff --git a/weed/topology/volume_location.go b/weed/topology/volume_location.go deleted file mode 100644 index 507a240b5..000000000 --- a/weed/topology/volume_location.go +++ /dev/null @@ -1,58 +0,0 @@ -package topology - -import () - -type VolumeLocationList struct { - list []*DataNode -} - -func NewVolumeLocationList() *VolumeLocationList { - return &VolumeLocationList{} -} - -func (dnll *VolumeLocationList) Head() *DataNode { - return dnll.list[0] -} - -func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) -} - -func (dnll *VolumeLocationList) Add(loc *DataNode) bool { - for _, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - return false - } - } - dnll.list = append(dnll.list, loc) - return true -} - -func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) - return true - } - } - return false -} - -func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { - var changed bool - for _, dnl := range dnll.list { - if dnl.LastSeen < freshThreshHold { - changed = true - break - } - } - if changed { - var l []*DataNode - for _, dnl := range dnll.list { - if dnl.LastSeen >= freshThreshHold { - l = append(l, dnl) - } - } - dnll.list = l - } -} |
