diff options
| author | Chris Lu <chris.lu@gmail.com> | 2014-04-13 01:29:52 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2014-04-13 01:29:52 -0700 |
| commit | f7f582ec8698dc43f1a2289dbd06fe0cade7468f (patch) | |
| tree | 1b788ffd9b33ef6807e6aaea3bc24b08cbf10fa8 /go/replication | |
| parent | 008aee0dc1932f75c86e52893044d9cd953ef405 (diff) | |
| download | seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.tar.xz seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.zip | |
1. refactoring, merge "replication" logic into "topology" package
2. when growing volumes, additional preferred "rack" and "dataNode"
paraemters are also provided. Previously only "dataCenter" paraemter is
provided.
Diffstat (limited to 'go/replication')
| -rw-r--r-- | go/replication/allocate_volume.go | 33 | ||||
| -rw-r--r-- | go/replication/store_replicate.go | 96 | ||||
| -rw-r--r-- | go/replication/volume_growth.go | 148 | ||||
| -rw-r--r-- | go/replication/volume_growth_test.go | 128 |
4 files changed, 0 insertions, 405 deletions
diff --git a/go/replication/allocate_volume.go b/go/replication/allocate_volume.go deleted file mode 100644 index fb40c6353..000000000 --- a/go/replication/allocate_volume.go +++ /dev/null @@ -1,33 +0,0 @@ -package replication - -import ( - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" - "code.google.com/p/weed-fs/go/util" - "encoding/json" - "errors" - "net/url" -) - -type AllocateVolumeResult struct { - Error string -} - -func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("collection", collection) - values.Add("replication", rp.String()) - jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go deleted file mode 100644 index 249e7e3e6..000000000 --- a/go/replication/store_replicate.go +++ /dev/null @@ -1,96 +0,0 @@ -package replication - -import ( - "bytes" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" - "net/http" - "strconv" -) - -func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } - if errorStatus != "" { - if _, err = s.Delete(volumeId, needle); err != nil { - errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + - volumeId.String() + ": " + err.Error() - } else { - distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") - }) - } - } - size = ret - return -} - -func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) { - ret, err := store.Delete(volumeId, n) - if err != nil { - glog.V(0).Infoln("delete error:", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") - }) { - ret = 0 - } - } - } - return -} - -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - length := 0 - selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go deleted file mode 100644 index 33dfc570e..000000000 --- a/go/replication/volume_growth.go +++ /dev/null @@ -1,148 +0,0 @@ -package replication - -import ( - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" - "fmt" - "math/rand" - "sync" -) - -/* -This package is created to resolve these replica placement issues: -1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies -2. in time of tight storage, how to reduce replica level -3. optimizing for hot data on faster disk, cold data on cheaper storage, -4. volume allocation for each bucket -*/ - -type VolumeGrowth struct { - accessLock sync.Mutex -} - -func NewDefaultVolumeGrowth() *VolumeGrowth { - return &VolumeGrowth{} -} - -// one replication type may need rp.GetCopyCount() actual volumes -// given copyCount, how many logical volumes to create -func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { - switch copyCount { - case 1: - count = 7 - case 2: - count = 6 - case 3: - count = 3 - default: - count = 1 - } - return -} - -func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) { - count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo) - if count > 0 && count%rp.GetCopyCount() == 0 { - return count, nil - } - return count, err -} -func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() - - for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil { - counter += c - } else { - return counter, e - } - } - return -} - -func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp) - if e != nil { - return 0, e - } - vid := topo.NextVolumeId() - err := vg.grow(topo, vid, collection, rp, servers...) - return len(servers), err -} - -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) { - //find main datacenter and other data centers - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) error { - if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) { - return fmt.Errorf("Not matching preferred:%s", preferredDataCenter) - } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) - } - return nil - }) - if dc_err != nil { - return nil, dc_err - } - - //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) error { - if node.FreeSpace() < rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) - } - return nil - }) - if rack_err != nil { - return nil, rack_err - } - - //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) error { - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) - } - return nil - }) - if server_err != nil { - return nil, server_err - } - - servers = append(servers, mainServer.(*topology.DataNode)) - for _, server := range otherServers { - servers = append(servers, server.(*topology.DataNode)) - } - for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) - if server, e := rack.ReserveOneVolume(r); e == nil { - servers = append(servers, server) - } else { - return servers, e - } - } - for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { - servers = append(servers, server) - } else { - return servers, e - } - } - return -} - -func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error { - for _, server := range servers { - if err := AllocateVolume(server, vid, collection, rp); err == nil { - vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - topo.RegisterVolumeLayout(vi, server) - glog.V(0).Infoln("Created Volume", vid, "on", server) - } else { - glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err) - return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error()) - } - } - return nil -} diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go deleted file mode 100644 index bb6cbe90e..000000000 --- a/go/replication/volume_growth_test.go +++ /dev/null @@ -1,128 +0,0 @@ -package replication - -import ( - "code.google.com/p/weed-fs/go/sequence" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" - "encoding/json" - "fmt" - "testing" -) - -var topologyLayout = ` -{ - "dc1":{ - "rack1":{ - "server111":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server112":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server121":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server122":{ - "volumes":[], - "limit":4 - }, - "server123":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":5 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server321":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } -} -` - -func setup(topologyLayout string) *topology.Topology { - var data interface{} - err := json.Unmarshal([]byte(topologyLayout), &data) - if err != nil { - fmt.Println("error:", err) - } - fmt.Println("data:", data) - - //need to connect all nodes first before server adding volumes - topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf", - sequence.NewMemorySequencer(), 32*1024, 5) - if err != nil { - panic("error: " + err.Error()) - } - mTopology := data.(map[string]interface{}) - for dcKey, dcValue := range mTopology { - dc := topology.NewDataCenter(dcKey) - dcMap := dcValue.(map[string]interface{}) - topo.LinkChildNode(dc) - for rackKey, rackValue := range dcMap { - rack := topology.NewRack(rackKey) - rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) - for serverKey, serverValue := range rackMap { - server := topology.NewDataNode(serverKey) - serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) - for _, v := range serverMap["volumes"].([]interface{}) { - m := v.(map[string]interface{}) - vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), - Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) - } - } - } - - return topo -} - -func TestFindEmptySlotsForOneVolume(t *testing.T) { - topo := setup(topologyLayout) - vg := NewDefaultVolumeGrowth() - rp, _ := storage.NewReplicaPlacementFromString("002") - servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp) - if err != nil { - fmt.Println("finding empty slots error :", err) - t.Fail() - } - for _, server := range servers { - fmt.Println("assigned node :", server.Id()) - } -} |
