aboutsummaryrefslogtreecommitdiff
path: root/src/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/weed/replication')
-rw-r--r--src/weed/replication/volume_growth.go195
-rw-r--r--src/weed/replication/volume_growth_test.go129
2 files changed, 324 insertions, 0 deletions
diff --git a/src/weed/replication/volume_growth.go b/src/weed/replication/volume_growth.go
new file mode 100644
index 000000000..0aae05bb3
--- /dev/null
+++ b/src/weed/replication/volume_growth.go
@@ -0,0 +1,195 @@
+package replication
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "weed/operation"
+ "weed/storage"
+ "weed/topology"
+ "sync"
+)
+
+/*
+This package is created to resolve these replica placement issues:
+1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
+2. in time of tight storage, how to reduce replica level
+3. optimizing for hot data on faster disk, cold data on cheaper storage,
+4. volume allocation for each bucket
+*/
+
+type VolumeGrowth struct {
+ copy1factor int
+ copy2factor int
+ copy3factor int
+ copyAll int
+
+ accessLock sync.Mutex
+}
+
+func NewDefaultVolumeGrowth() *VolumeGrowth {
+ return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
+}
+
+func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
+ switch repType {
+ case storage.Copy000:
+ return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
+ case storage.Copy001:
+ return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy010:
+ return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy100:
+ return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy110:
+ return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ case storage.Copy200:
+ return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ }
+ return 0, errors.New("Unknown Replication Type!")
+}
+func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
+ vg.accessLock.Lock()
+ defer vg.accessLock.Unlock()
+
+ counter = 0
+ switch repType {
+ case storage.Copy000:
+ for i := 0; i < count; i++ {
+ if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
+ if err = vg.grow(topo, *vid, repType, server); err == nil {
+ counter++
+ }
+ }
+ }
+ case storage.Copy001:
+ for i := 0; i < count; i++ {
+ //randomly pick one server, and then choose from the same rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ rack := server1.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[server1.String()] = server1
+ newNodeList := topology.NewNodeList(rack.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
+ if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ }
+ case storage.Copy010:
+ for i := 0; i < count; i++ {
+ //randomly pick one server, and then choose from the same rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ rack := server1.Parent()
+ dc := rack.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[rack.String()] = rack
+ newNodeList := topology.NewNodeList(dc.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
+ if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ }
+ case storage.Copy100:
+ for i := 0; i < count; i++ {
+ nl := topology.NewNodeList(topo.Children(), nil)
+ picked, ret := nl.RandomlyPickN(2, 1)
+ vid := topo.NextVolumeId()
+ if ret {
+ var servers []*topology.DataNode
+ for _, n := range picked {
+ if n.FreeSpace() > 0 {
+ if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
+ servers = append(servers, server)
+ }
+ }
+ }
+ if len(servers) == 2 {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ case storage.Copy110:
+ for i := 0; i < count; i++ {
+ nl := topology.NewNodeList(topo.Children(), nil)
+ picked, ret := nl.RandomlyPickN(2, 2)
+ vid := topo.NextVolumeId()
+ if ret {
+ var servers []*topology.DataNode
+ dc1, dc2 := picked[0], picked[1]
+ if dc2.FreeSpace() > dc1.FreeSpace() {
+ dc1, dc2 = dc2, dc1
+ }
+ if dc1.FreeSpace() > 0 {
+ if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
+ servers = append(servers, server1)
+ rack := server1.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[rack.String()] = rack
+ newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
+ servers = append(servers, server2)
+ }
+ }
+ }
+ }
+ if dc2.FreeSpace() > 0 {
+ if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
+ servers = append(servers, server)
+ }
+ }
+ if len(servers) == 3 {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ case storage.Copy200:
+ for i := 0; i < count; i++ {
+ nl := topology.NewNodeList(topo.Children(), nil)
+ picked, ret := nl.RandomlyPickN(3, 1)
+ vid := topo.NextVolumeId()
+ if ret {
+ var servers []*topology.DataNode
+ for _, n := range picked {
+ if n.FreeSpace() > 0 {
+ if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
+ servers = append(servers, server)
+ }
+ }
+ }
+ if len(servers) == 3 {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ }
+ return
+}
+func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
+ for _, server := range servers {
+ if err := operation.AllocateVolume(server, vid, repType); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType, Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ topo.RegisterVolumeLayout(&vi, server)
+ fmt.Println("Created Volume", vid, "on", server)
+ } else {
+ fmt.Println("Failed to assign", vid, "to", servers)
+ return errors.New("Failed to assign " + vid.String())
+ }
+ }
+ return nil
+}
diff --git a/src/weed/replication/volume_growth_test.go b/src/weed/replication/volume_growth_test.go
new file mode 100644
index 000000000..ed7467785
--- /dev/null
+++ b/src/weed/replication/volume_growth_test.go
@@ -0,0 +1,129 @@
+package replication
+
+import (
+ "encoding/json"
+ "fmt"
+ "math/rand"
+ "weed/storage"
+ "weed/topology"
+ "testing"
+ "time"
+)
+
+var topologyLayout = `
+{
+ "dc1":{
+ "rack1":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server2":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server2":{
+ "volumes":[],
+ "limit":4
+ },
+ "server3":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":2
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server1":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
+}
+`
+
+func setup(topologyLayout string) *topology.Topology {
+ var data interface{}
+ err := json.Unmarshal([]byte(topologyLayout), &data)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
+ fmt.Println("data:", data)
+
+ //need to connect all nodes first before server adding volumes
+ topo := topology.NewTopology("mynetwork", "/etc/weedfs/weedfs.conf", "/tmp", "testing", 32*1024, 5)
+ mTopology := data.(map[string]interface{})
+ for dcKey, dcValue := range mTopology {
+ dc := topology.NewDataCenter(dcKey)
+ dcMap := dcValue.(map[string]interface{})
+ topo.LinkChildNode(dc)
+ for rackKey, rackValue := range dcMap {
+ rack := topology.NewRack(rackKey)
+ rackMap := rackValue.(map[string]interface{})
+ dc.LinkChildNode(rack)
+ for serverKey, serverValue := range rackMap {
+ server := topology.NewDataNode(serverKey)
+ serverMap := serverValue.(map[string]interface{})
+ rack.LinkChildNode(server)
+ for _, v := range serverMap["volumes"].([]interface{}) {
+ m := v.(map[string]interface{})
+ vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64)), Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ }
+ server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ }
+ }
+ }
+
+ return topo
+}
+
+func TestRemoveDataCenter(t *testing.T) {
+ topo := setup(topologyLayout)
+ topo.UnlinkChildNode(topology.NodeId("dc2"))
+ if topo.GetActiveVolumeCount() != 15 {
+ t.Fail()
+ }
+ topo.UnlinkChildNode(topology.NodeId("dc3"))
+ if topo.GetActiveVolumeCount() != 12 {
+ t.Fail()
+ }
+}
+
+func TestReserveOneVolume(t *testing.T) {
+ topo := setup(topologyLayout)
+ rand.Seed(time.Now().UnixNano())
+ vg := &VolumeGrowth{copy1factor: 3, copy2factor: 2, copy3factor: 1, copyAll: 4}
+ if c, e := vg.GrowByCountAndType(1, storage.Copy000, topo); e == nil {
+ t.Log("reserved", c)
+ }
+}