aboutsummaryrefslogtreecommitdiff
path: root/go/replication
diff options
context:
space:
mode:
Diffstat (limited to 'go/replication')
-rw-r--r--go/replication/allocate_volume.go33
-rw-r--r--go/replication/store_replicate.go96
-rw-r--r--go/replication/volume_growth.go148
-rw-r--r--go/replication/volume_growth_test.go128
4 files changed, 0 insertions, 405 deletions
diff --git a/go/replication/allocate_volume.go b/go/replication/allocate_volume.go
deleted file mode 100644
index fb40c6353..000000000
--- a/go/replication/allocate_volume.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package replication
-
-import (
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
- "code.google.com/p/weed-fs/go/util"
- "encoding/json"
- "errors"
- "net/url"
-)
-
-type AllocateVolumeResult struct {
- Error string
-}
-
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("collection", collection)
- values.Add("replication", rp.String())
- jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
- if err != nil {
- return err
- }
- var ret AllocateVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go
deleted file mode 100644
index 249e7e3e6..000000000
--- a/go/replication/store_replicate.go
+++ /dev/null
@@ -1,96 +0,0 @@
-package replication
-
-import (
- "bytes"
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/util"
- "net/http"
- "strconv"
-)
-
-func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
- ret, err := s.Write(volumeId, needle)
- needToReplicate := !s.HasVolume(volumeId)
- if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- } else if ret > 0 {
- needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
- } else {
- errorStatus = "Failed to write to local disk"
- }
- if !needToReplicate && ret > 0 {
- needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
- _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
- return err == nil
- }) {
- ret = 0
- errorStatus = "Failed to write to replicas for volume " + volumeId.String()
- }
- }
- }
- if errorStatus != "" {
- if _, err = s.Delete(volumeId, needle); err != nil {
- errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
- volumeId.String() + ": " + err.Error()
- } else {
- distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
- return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
- })
- }
- }
- size = ret
- return
-}
-
-func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
- ret, err := store.Delete(volumeId, n)
- if err != nil {
- glog.V(0).Infoln("delete error:", err)
- return
- }
-
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
- return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
- }) {
- ret = 0
- }
- }
- }
- return
-}
-
-func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
- length := 0
- selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
- results := make(chan bool)
- for _, location := range lookupResult.Locations {
- if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan bool) {
- results <- op(location)
- }(location, results)
- }
- }
- ret := true
- for i := 0; i < length; i++ {
- ret = ret && <-results
- }
- return ret
- } else {
- glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
- }
- return false
-}
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
deleted file mode 100644
index 33dfc570e..000000000
--- a/go/replication/volume_growth.go
+++ /dev/null
@@ -1,148 +0,0 @@
-package replication
-
-import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
- "fmt"
- "math/rand"
- "sync"
-)
-
-/*
-This package is created to resolve these replica placement issues:
-1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
-2. in time of tight storage, how to reduce replica level
-3. optimizing for hot data on faster disk, cold data on cheaper storage,
-4. volume allocation for each bucket
-*/
-
-type VolumeGrowth struct {
- accessLock sync.Mutex
-}
-
-func NewDefaultVolumeGrowth() *VolumeGrowth {
- return &VolumeGrowth{}
-}
-
-// one replication type may need rp.GetCopyCount() actual volumes
-// given copyCount, how many logical volumes to create
-func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
- switch copyCount {
- case 1:
- count = 7
- case 2:
- count = 6
- case 3:
- count = 3
- default:
- count = 1
- }
- return
-}
-
-func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo)
- if count > 0 && count%rp.GetCopyCount() == 0 {
- return count, nil
- }
- return count, err
-}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) {
- vg.accessLock.Lock()
- defer vg.accessLock.Unlock()
-
- for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil {
- counter += c
- } else {
- return counter, e
- }
- }
- return
-}
-
-func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) {
- servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp)
- if e != nil {
- return 0, e
- }
- vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, collection, rp, servers...)
- return len(servers), err
-}
-
-func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) {
- //find main datacenter and other data centers
- mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) error {
- if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) {
- return fmt.Errorf("Not matching preferred:%s", preferredDataCenter)
- }
- if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
- }
- return nil
- })
- if dc_err != nil {
- return nil, dc_err
- }
-
- //find main rack and other racks
- mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) error {
- if node.FreeSpace() < rp.SameRackCount+1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
- }
- return nil
- })
- if rack_err != nil {
- return nil, rack_err
- }
-
- //find main rack and other racks
- mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) error {
- if node.FreeSpace() < 1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
- }
- return nil
- })
- if server_err != nil {
- return nil, server_err
- }
-
- servers = append(servers, mainServer.(*topology.DataNode))
- for _, server := range otherServers {
- servers = append(servers, server.(*topology.DataNode))
- }
- for _, rack := range otherRacks {
- r := rand.Intn(rack.FreeSpace())
- if server, e := rack.ReserveOneVolume(r); e == nil {
- servers = append(servers, server)
- } else {
- return servers, e
- }
- }
- for _, datacenter := range otherDataCenters {
- r := rand.Intn(datacenter.FreeSpace())
- if server, e := datacenter.ReserveOneVolume(r); e == nil {
- servers = append(servers, server)
- } else {
- return servers, e
- }
- }
- return
-}
-
-func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error {
- for _, server := range servers {
- if err := AllocateVolume(server, vid, collection, rp); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- topo.RegisterVolumeLayout(vi, server)
- glog.V(0).Infoln("Created Volume", vid, "on", server)
- } else {
- glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
- return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error())
- }
- }
- return nil
-}
diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go
deleted file mode 100644
index bb6cbe90e..000000000
--- a/go/replication/volume_growth_test.go
+++ /dev/null
@@ -1,128 +0,0 @@
-package replication
-
-import (
- "code.google.com/p/weed-fs/go/sequence"
- "code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
- "encoding/json"
- "fmt"
- "testing"
-)
-
-var topologyLayout = `
-{
- "dc1":{
- "rack1":{
- "server111":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server112":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server121":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server122":{
- "volumes":[],
- "limit":4
- },
- "server123":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":5
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server321":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
-}
-`
-
-func setup(topologyLayout string) *topology.Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
- fmt.Println("data:", data)
-
- //need to connect all nodes first before server adding volumes
- topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
- sequence.NewMemorySequencer(), 32*1024, 5)
- if err != nil {
- panic("error: " + err.Error())
- }
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := topology.NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- rack := topology.NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(rack)
- for serverKey, serverValue := range rackMap {
- server := topology.NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- rack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
- Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- }
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
- }
- }
- }
-
- return topo
-}
-
-func TestFindEmptySlotsForOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- vg := NewDefaultVolumeGrowth()
- rp, _ := storage.NewReplicaPlacementFromString("002")
- servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp)
- if err != nil {
- fmt.Println("finding empty slots error :", err)
- t.Fail()
- }
- for _, server := range servers {
- fmt.Println("assigned node :", server.Id())
- }
-}