diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-06-19 18:10:38 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-06-19 18:10:38 -0700 |
| commit | 50269b74ce615ab02f6bf64a2bc0fc9e71122267 (patch) | |
| tree | 887f63247a589cb027e65331b9243edcad61f479 /go/topology | |
| parent | 715d327df0ad64a70837711c664e1ef024e0bcc5 (diff) | |
| download | seaweedfs-50269b74ce615ab02f6bf64a2bc0fc9e71122267.tar.xz seaweedfs-50269b74ce615ab02f6bf64a2bc0fc9e71122267.zip | |
add dataCenter option when assign file keys
add dataCenter option when starting volume servers
some work related to freeze a volume. Not tested yet.
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/configuration.go | 17 | ||||
| -rw-r--r-- | go/topology/data_node.go | 5 | ||||
| -rw-r--r-- | go/topology/node.go | 15 | ||||
| -rw-r--r-- | go/topology/node_list.go | 30 | ||||
| -rw-r--r-- | go/topology/node_list_test.go | 24 | ||||
| -rw-r--r-- | go/topology/topo_test.go | 5 | ||||
| -rw-r--r-- | go/topology/topology.go | 28 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 44 |
8 files changed, 119 insertions, 49 deletions
diff --git a/go/topology/configuration.go b/go/topology/configuration.go index 4c8424214..058600a7c 100644 --- a/go/topology/configuration.go +++ b/go/topology/configuration.go @@ -46,11 +46,20 @@ func (c *Configuration) String() string { return "" } -func (c *Configuration) Locate(ip string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName +func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { + if dcName == "" { + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName + } + } + } else { + if rackName == "" { + return dcName, "DefaultRack" + } else { + return dcName, rackName } } + return "DefaultDataCenter", "DefaultRack" } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index ea4ea5d39..3a6edb447 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -34,8 +34,11 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { dn.volumes[v.Id] = v } } +func (dn *DataNode) GetDataCenter() *DataCenter { + return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) +} func (dn *DataNode) GetTopology() *Topology { - p := dn.parent + p := dn.Parent() for p.Parent() != nil { p = p.Parent() } diff --git a/go/topology/node.go b/go/topology/node.go index 786f76702..d61f01244 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -10,7 +10,7 @@ type Node interface { Id() NodeId String() string FreeSpace() int - ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) + ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) UpAdjustVolumeCountDelta(volumeCountDelta int) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) @@ -26,6 +26,8 @@ type Node interface { CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) IsDataNode() bool + IsRack() bool + IsDataCenter() bool Children() map[NodeId]Node Parent() Node @@ -78,23 +80,26 @@ func (n *NodeImpl) Parent() Node { func (n *NodeImpl) GetValue() interface{} { return n.value } -func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) { +func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) { ret := false var assignedNode *DataNode for _, node := range n.children { freeSpace := node.FreeSpace() - //fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) if freeSpace <= 0 { continue } + if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) { + continue + } if r >= freeSpace { r -= freeSpace } else { if node.IsDataNode() && node.FreeSpace() > 0 { - //fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) return true, node.(*DataNode) } - ret, assignedNode = node.ReserveOneVolume(r, vid) + ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter) if ret { break } diff --git a/go/topology/node_list.go b/go/topology/node_list.go index db7723714..2be90b123 100644 --- a/go/topology/node_list.go +++ b/go/topology/node_list.go @@ -30,23 +30,37 @@ func (nl *NodeList) FreeSpace() int { return freeSpace } -func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) { +func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) { var list []Node + var preferredNode *Node + if firstNodeName != "" { + for _, n := range nl.nodes { + if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace { + preferredNode = &n + break + } + } + if preferredNode == nil { + return list, false + } + } + for _, n := range nl.nodes { - if n.FreeSpace() >= min { + if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) { list = append(list, n) } } - if n > len(list) { + if count > len(list) || count == len(list) && firstNodeName != "" { return nil, false } - for i := n; i > 0; i-- { + for i := len(list); i > 0; i-- { r := rand.Intn(i) - t := list[r] - list[r] = list[i-1] - list[i-1] = t + list[r], list[i-1] = list[i-1], list[r] + } + if firstNodeName != "" { + list[0] = *preferredNode } - return list[len(list)-n:], true + return list[:count], true } func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) { diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go index c6e530724..0037cbaa9 100644 --- a/go/topology/node_list_test.go +++ b/go/topology/node_list_test.go @@ -20,22 +20,38 @@ func TestXYZ(t *testing.T) { } nl := NewNodeList(topo.Children(), nil) - picked, ret := nl.RandomlyPickN(1, 0) + picked, ret := nl.RandomlyPickN(1, 0, "") if !ret || len(picked) != 1 { t.Error("need to randomly pick 1 node") } - picked, ret = nl.RandomlyPickN(4, 0) + picked, ret = nl.RandomlyPickN(1, 0, "dc1") + if !ret || len(picked) != 1 { + t.Error("need to randomly pick 1 node") + } + if picked[0].Id() != "dc1" { + t.Error("need to randomly pick 1 dc1 node") + } + + picked, ret = nl.RandomlyPickN(2, 0, "dc1") + if !ret || len(picked) != 2 { + t.Error("need to randomly pick 1 node") + } + if picked[0].Id() != "dc1" { + t.Error("need to randomly pick 2 with one dc1 node") + } + + picked, ret = nl.RandomlyPickN(4, 0, "") if !ret || len(picked) != 4 { t.Error("need to randomly pick 4 nodes") } - picked, ret = nl.RandomlyPickN(5, 0) + picked, ret = nl.RandomlyPickN(5, 0, "") if !ret || len(picked) != 5 { t.Error("need to randomly pick 5 nodes") } - picked, ret = nl.RandomlyPickN(6, 0) + picked, ret = nl.RandomlyPickN(6, 0, "") if ret || len(picked) != 0 { t.Error("can not randomly pick 6 nodes:", ret, picked) } diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go index 99e570821..d5ea08086 100644 --- a/go/topology/topo_test.go +++ b/go/topology/topo_test.go @@ -127,7 +127,10 @@ func TestReserveOneVolume(t *testing.T) { topo := setup(topologyLayout) rand.Seed(time.Now().UnixNano()) rand.Seed(1) - ret, node, vid := topo.RandomlyReserveOneVolume() + ret, node, vid := topo.RandomlyReserveOneVolume("dc1") + if node.Parent().Parent().Id() != NodeId("dc1") { + t.Fail() + } fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid) } diff --git a/go/topology/topology.go b/go/topology/topology.go index 5dcc56204..e488319d1 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -4,6 +4,7 @@ import ( "code.google.com/p/weed-fs/go/sequence" "code.google.com/p/weed-fs/go/storage" "errors" + "fmt" "io/ioutil" "log" "math/rand" @@ -71,25 +72,13 @@ func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode { return nil } -func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) { +func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) { if t.FreeSpace() <= 0 { + fmt.Println("Topology does not have free space left!") return false, nil, nil } vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid) - return ret, node, &vid -} - -func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) { - freeSpace := t.FreeSpace() - for _, node := range except { - freeSpace -= node.FreeSpace() - } - if freeSpace <= 0 { - return false, nil, nil - } - vid := t.NextVolumeId() - ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid) + ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter) return ret, node, &vid } @@ -98,12 +87,12 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return vid.Next() } -func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) { +func (t *Topology) PickForWrite(repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) { replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } - vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count) + vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count, dataCenter) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } @@ -114,6 +103,7 @@ func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (str func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout { replicationTypeIndex := repType.GetReplicationLevelIndex() if t.replicaType2VolumeLayout[replicationTypeIndex] == nil { + fmt.Println("adding replication type", repType) t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse) } return t.replicaType2VolumeLayout[replicationTypeIndex] @@ -123,8 +113,8 @@ func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) { t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn) } -func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) { - dcName, rackName := t.configuration.Locate(ip) +func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) { + dcName, rackName = t.configuration.Locate(ip, dcName, rackName) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn := rack.FindDataNode(ip, port) diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index cd725c132..d8ed49b0b 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -51,22 +51,52 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { return nil } -func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { fmt.Println("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { + if dataCenter == "" { + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + } else { + var vid storage.VolumeId + var locationList *VolumeLocationList + counter := 0 + for _, v := range vl.writables { + volumeLocationList := vl.vid2location[v] + for _, dn := range volumeLocationList.list { + if dn.GetDataCenter().Id() == NodeId(dataCenter) { + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList + } + } + } + } return &vid, count, locationList, nil } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!") } -func (vl *VolumeLayout) GetActiveVolumeCount() int { - return len(vl.writables) +func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int { + if dataCenter == "" { + return len(vl.writables) + } + counter := 0 + for _, v := range vl.writables { + for _, dn := range vl.vid2location[v].list { + if dn.GetDataCenter().Id() == NodeId(dataCenter) { + counter++ + } + } + } + return counter } func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { |
