diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/collection.go | 14 | ||||
| -rw-r--r-- | go/topology/data_center.go | 1 | ||||
| -rw-r--r-- | go/topology/node.go | 69 | ||||
| -rw-r--r-- | go/topology/node_list.go | 83 | ||||
| -rw-r--r-- | go/topology/node_list_test.go | 60 | ||||
| -rw-r--r-- | go/topology/rack.go | 1 | ||||
| -rw-r--r-- | go/topology/topo_test.go | 14 | ||||
| -rw-r--r-- | go/topology/topology.go | 20 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 6 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 16 |
10 files changed, 82 insertions, 202 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go index 0a7971424..8042369a9 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -13,17 +13,17 @@ type Collection struct { func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType) + c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount) return c } -func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout { - replicationTypeIndex := repType.GetReplicationLevelIndex() - if c.replicaType2VolumeLayout[replicationTypeIndex] == nil { - glog.V(0).Infoln("collection", c.Name, "adding replication type", repType) - c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit) +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout { + replicaPlacementIndex := rp.GetReplicationLevelIndex() + if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil { + glog.V(0).Infoln("collection", c.Name, "adding replication type", rp) + c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit) } - return c.replicaType2VolumeLayout[replicationTypeIndex] + return c.replicaType2VolumeLayout[replicaPlacementIndex] } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { diff --git a/go/topology/data_center.go b/go/topology/data_center.go index a3b2b7d13..ebd07803b 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -29,6 +29,7 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { func (dc *DataCenter) ToMap() interface{} { m := make(map[string]interface{}) + m["Id"] = dc.Id() m["Max"] = dc.GetMaxVolumeCount() m["Free"] = dc.FreeSpace() var racks []interface{} diff --git a/go/topology/node.go b/go/topology/node.go index cfd6f6489..abe363b39 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -3,6 +3,8 @@ package topology import ( "code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/storage" + "errors" + "math/rand" ) type NodeId string @@ -10,7 +12,7 @@ type Node interface { Id() NodeId String() string FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) + ReserveOneVolume(r int) (*DataNode, error) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) @@ -47,6 +49,54 @@ type NodeImpl struct { value interface{} } +// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot +func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) bool) (firstNode Node, restNodes []Node, err error) { + candidates := make([]Node, 0, len(n.children)) + for _, node := range n.children { + if filterFirstNodeFn(node) { + candidates = append(candidates, node) + } + } + if len(candidates) == 0 { + return nil, nil, errors.New("No matching data node found!") + } + firstNode = candidates[rand.Intn(len(candidates))] + glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) + + restNodes = make([]Node, numberOfNodes-1) + candidates = candidates[:0] + for _, node := range n.children { + if node.Id() == firstNode.Id() { + continue + } + if node.FreeSpace() <= 0 { + continue + } + glog.V(2).Infoln("select rest node candidate:", node.Id()) + candidates = append(candidates, node) + } + glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") + ret := len(restNodes) == 0 + for k, node := range candidates { + if k < len(restNodes) { + restNodes[k] = node + if k == len(restNodes)-1 { + ret = true + } + } else { + r := rand.Intn(k + 1) + if r < len(restNodes) { + restNodes[r] = node + } + } + } + if !ret { + glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") + err = errors.New("Not enough data node found!") + } + return +} + func (n *NodeImpl) IsDataNode() bool { return n.nodeType == "DataNode" } @@ -80,32 +130,27 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) { - ret := false - var assignedNode *DataNode +func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { for _, node := range n.children { freeSpace := node.FreeSpace() // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } - if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) { - continue - } if r >= freeSpace { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) + return node.(*DataNode), nil } - ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter) - if ret { - break + assignedNode, err = node.ReserveOneVolume(r) + if err != nil { + return } } } - return ret, assignedNode + return } func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative diff --git a/go/topology/node_list.go b/go/topology/node_list.go deleted file mode 100644 index bed151b54..000000000 --- a/go/topology/node_list.go +++ /dev/null @@ -1,83 +0,0 @@ -package topology - -import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" - "math/rand" -) - -type NodeList struct { - nodes map[NodeId]Node - except map[string]Node -} - -func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList { - m := make(map[NodeId]Node, len(nodes)-len(except)) - for _, n := range nodes { - if except[n.String()] == nil { - m[n.Id()] = n - } - } - nl := &NodeList{nodes: m} - return nl -} - -func (nl *NodeList) FreeSpace() int { - freeSpace := 0 - for _, n := range nl.nodes { - freeSpace += n.FreeSpace() - } - return freeSpace -} - -func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) { - var list []Node - var preferredNode *Node - if firstNodeName != "" { - for _, n := range nl.nodes { - if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace { - preferredNode = &n - break - } - } - if preferredNode == nil { - return list, false - } - } - - for _, n := range nl.nodes { - if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) { - list = append(list, n) - } - } - if count > len(list) || count == len(list) && firstNodeName != "" { - return nil, false - } - for i := len(list); i > 0; i-- { - r := rand.Intn(i) - list[r], list[i-1] = list[i-1], list[r] - } - if firstNodeName != "" { - list[0] = *preferredNode - } - return list[:count], true -} - -func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { - for _, node := range nl.nodes { - freeSpace := node.FreeSpace() - if randomVolumeIndex >= freeSpace { - randomVolumeIndex -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - glog.V(0).Infoln("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return true, node.(*DataNode) - } - children := node.Children() - newNodeList := NewNodeList(children, nl.except) - return newNodeList.ReserveOneVolume(randomVolumeIndex, vid) - } - } - return false, nil - -} diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go deleted file mode 100644 index c526f55f8..000000000 --- a/go/topology/node_list_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package topology - -import ( - "code.google.com/p/weed-fs/go/sequence" - _ "fmt" - "strconv" - "testing" -) - -func TestXYZ(t *testing.T) { - topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5) - if err != nil { - t.Error("cannot create new topology:", err) - t.FailNow() - } - for i := 0; i < 5; i++ { - dc := NewDataCenter("dc" + strconv.Itoa(i)) - dc.activeVolumeCount = i - dc.maxVolumeCount = 5 - topo.LinkChildNode(dc) - } - nl := NewNodeList(topo.Children(), nil) - - picked, ret := nl.RandomlyPickN(1, 0, "") - if !ret || len(picked) != 1 { - t.Error("need to randomly pick 1 node") - } - - picked, ret = nl.RandomlyPickN(1, 0, "dc1") - if !ret || len(picked) != 1 { - t.Error("need to randomly pick 1 node") - } - if picked[0].Id() != "dc1" { - t.Error("need to randomly pick 1 dc1 node") - } - - picked, ret = nl.RandomlyPickN(2, 0, "dc1") - if !ret || len(picked) != 2 { - t.Error("need to randomly pick 1 node") - } - if picked[0].Id() != "dc1" { - t.Error("need to randomly pick 2 with one dc1 node") - } - - picked, ret = nl.RandomlyPickN(4, 0, "") - if !ret || len(picked) != 4 { - t.Error("need to randomly pick 4 nodes") - } - - picked, ret = nl.RandomlyPickN(5, 0, "") - if !ret || len(picked) != 5 { - t.Error("need to randomly pick 5 nodes") - } - - picked, ret = nl.RandomlyPickN(6, 0, "") - if ret || len(picked) != 0 { - t.Error("can not randomly pick 6 nodes:", ret, picked) - } - -} diff --git a/go/topology/rack.go b/go/topology/rack.go index acc34417a..40e19dd0d 100644 --- a/go/topology/rack.go +++ b/go/topology/rack.go @@ -52,6 +52,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol func (rack *Rack) ToMap() interface{} { m := make(map[string]interface{}) + m["Id"] = rack.Id() m["Max"] = rack.GetMaxVolumeCount() m["Free"] = rack.FreeSpace() var dns []interface{} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index c0edca7c1..f3ae2096b 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -5,9 +5,7 @@ import ( "code.google.com/p/weed-fs/go/storage" "encoding/json" "fmt" - "math/rand" "testing" - "time" ) var topologyLayout = ` @@ -124,15 +122,3 @@ func TestRemoveDataCenter(t *testing.T) { t.Fail() } } - -func TestReserveOneVolume(t *testing.T) { - topo := setup(topologyLayout) - rand.Seed(time.Now().UnixNano()) - rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume("dc1") - if node.Parent().Parent().Id() != NodeId("dc1") { - t.Fail() - } - fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) - -} diff --git a/go/topology/topology.go b/go/topology/topology.go index 24b3ab337..1426f7a12 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -77,23 +77,13 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) { - if t.FreeSpace() <= 0 { - glog.V(0).Infoln("Topology does not have free space left!") - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter) - return ret, node, &vid -} - func (t *Topology) NextVolumeId() storage.VolumeId { vid := t.GetMaxVolumeId() return vid.Next() } -func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter) +func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -101,16 +91,16 @@ func (t *Topology) PickForWrite(collectionName string, repType storage.Replicati return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil } -func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout { +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout { _, ok := t.collectionMap[collectionName] if !ok { t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType) + return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp) } func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn) } func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index 5097e9874..5740c9a03 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { }() } func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType) + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { return false } @@ -53,7 +53,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { func (t *Topology) UnRegisterDataNode(dn *DataNode) { for _, v := range dn.volumes { glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) vl.SetVolumeUnavailable(dn, v.Id) } dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) @@ -63,7 +63,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) { } func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { for _, v := range dn.volumes { - vl := t.GetVolumeLayout(v.Collection, v.RepType) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement) if vl.isWritable(&v) { vl.SetVolumeAvailable(dn, v.Id) } diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 8877d7ccf..40628b4a0 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -9,16 +9,16 @@ import ( ) type VolumeLayout struct { - repType storage.ReplicationType + rp *storage.ReplicaPlacement vid2location map[storage.VolumeId]*VolumeLocationList writables []storage.VolumeId // transient array of writable volume id volumeSizeLimit uint64 accessLock sync.Mutex } -func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout { return &VolumeLayout{ - repType: repType, + rp: rp, vid2location: make(map[storage.VolumeId]*VolumeLocationList), writables: *new([]storage.VolumeId), volumeSizeLimit: volumeSizeLimit, @@ -33,7 +33,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.vid2location[v.Id] = NewVolumeLocationList() } if vl.vid2location[v.Id].Add(dn) { - if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() { + if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() { if vl.isWritable(v) { vl.writables = append(vl.writables, v.Id) } else { @@ -135,8 +135,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) defer vl.accessLock.Unlock() if vl.vid2location[vid].Remove(dn) { - if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount()) + if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount()) return vl.removeFromWritable(vid) } } @@ -147,7 +147,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b defer vl.accessLock.Unlock() if vl.vid2location[vid].Add(dn) { - if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() { + if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { return vl.setVolumeWritable(vid) } } @@ -164,7 +164,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { func (vl *VolumeLayout) ToMap() map[string]interface{} { m := make(map[string]interface{}) - m["replication"] = vl.repType.String() + m["replication"] = vl.rp.String() m["writables"] = vl.writables //m["locations"] = vl.vid2location return m |
