aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-04-13 01:29:52 -0700
committerChris Lu <chris.lu@gmail.com>2014-04-13 01:29:52 -0700
commitf7f582ec8698dc43f1a2289dbd06fe0cade7468f (patch)
tree1b788ffd9b33ef6807e6aaea3bc24b08cbf10fa8 /go/topology
parent008aee0dc1932f75c86e52893044d9cd953ef405 (diff)
downloadseaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.tar.xz
seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.zip
1. refactoring, merge "replication" logic into "topology" package
2. when growing volumes, additional preferred "rack" and "dataNode" paraemters are also provided. Previously only "dataCenter" paraemter is provided.
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/allocate_volume.go32
-rw-r--r--go/topology/configuration.go24
-rw-r--r--go/topology/data_node.go10
-rw-r--r--go/topology/store_replicate.go96
-rw-r--r--go/topology/topology.go9
-rw-r--r--go/topology/volume_growth.go162
-rw-r--r--go/topology/volume_growth_test.go127
-rw-r--r--go/topology/volume_layout.go24
8 files changed, 464 insertions, 20 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
new file mode 100644
index 000000000..77b4ac508
--- /dev/null
+++ b/go/topology/allocate_volume.go
@@ -0,0 +1,32 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
+ "encoding/json"
+ "errors"
+ "net/url"
+)
+
+type AllocateVolumeResult struct {
+ Error string
+}
+
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("collection", collection)
+ values.Add("replication", rp.String())
+ jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
+ if err != nil {
+ return err
+ }
+ var ret AllocateVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/go/topology/configuration.go b/go/topology/configuration.go
index 058600a7c..ffcebb59c 100644
--- a/go/topology/configuration.go
+++ b/go/topology/configuration.go
@@ -47,19 +47,19 @@ func (c *Configuration) String() string {
}
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
- if dcName == "" {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
- } else {
- if rackName == "" {
- return dcName, "DefaultRack"
- } else {
- return dcName, rackName
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
}
}
- return "DefaultDataCenter", "DefaultRack"
+ if dcName == "" {
+ dcName = "DefaultDataCenter"
+ }
+
+ if rackName == "" {
+ rackName = "DefaultRack"
+ }
+
+ return dcName, rackName
}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 0cedb5cfe..ae80e08bb 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -24,6 +24,7 @@ func NewDataNode(id string) *DataNode {
s.NodeImpl.value = s
return s
}
+
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
@@ -36,6 +37,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.volumes[v.Id] = v
}
}
+
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
@@ -53,9 +55,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
dn.AddOrUpdateVolume(v)
}
}
+
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
}
+
+func (dn *DataNode) GetRack() *Rack {
+ return dn.Parent().(*NodeImpl).value.(*Rack)
+}
+
func (dn *DataNode) GetTopology() *Topology {
p := dn.Parent()
for p.Parent() != nil {
@@ -64,9 +72,11 @@ func (dn *DataNode) GetTopology() *Topology {
t := p.(*Topology)
return t
}
+
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}
+
func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
}
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
new file mode 100644
index 000000000..a982cebe5
--- /dev/null
+++ b/go/topology/store_replicate.go
@@ -0,0 +1,96 @@
+package topology
+
+import (
+ "bytes"
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
+ "net/http"
+ "strconv"
+)
+
+func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
+ ret, err := s.Write(volumeId, needle)
+ needToReplicate := !s.HasVolume(volumeId)
+ if err != nil {
+ errorStatus = "Failed to write to local disk (" + err.Error() + ")"
+ } else if ret > 0 {
+ needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
+ } else {
+ errorStatus = "Failed to write to local disk"
+ }
+ if !needToReplicate && ret > 0 {
+ needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+ if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
+ _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
+ return err == nil
+ }) {
+ ret = 0
+ errorStatus = "Failed to write to replicas for volume " + volumeId.String()
+ }
+ }
+ }
+ if errorStatus != "" {
+ if _, err = s.Delete(volumeId, needle); err != nil {
+ errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
+ volumeId.String() + ": " + err.Error()
+ } else {
+ distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ })
+ }
+ }
+ size = ret
+ return
+}
+
+func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
+ ret, err := store.Delete(volumeId, n)
+ if err != nil {
+ glog.V(0).Infoln("delete error:", err)
+ return
+ }
+
+ needToReplicate := !store.HasVolume(volumeId)
+ if !needToReplicate && ret > 0 {
+ needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+ if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ }) {
+ ret = 0
+ }
+ }
+ }
+ return
+}
+
+func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
+ length := 0
+ selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
+ results := make(chan bool)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ length++
+ go func(location operation.Location, results chan bool) {
+ results <- op(location)
+ }(location, results)
+ }
+ }
+ ret := true
+ for i := 0; i < length; i++ {
+ ret = ret && <-results
+ }
+ return ret
+ } else {
+ glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+ return false
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 6c5bde304..b1fa3f2a2 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -108,8 +108,13 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return next
}
-func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter)
+func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ return vl.GetActiveVolumeCount(option) > 0
+}
+
+func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
new file mode 100644
index 000000000..ee6233364
--- /dev/null
+++ b/go/topology/volume_growth.go
@@ -0,0 +1,162 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "fmt"
+ "math/rand"
+ "sync"
+)
+
+/*
+This package is created to resolve these replica placement issues:
+1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
+2. in time of tight storage, how to reduce replica level
+3. optimizing for hot data on faster disk, cold data on cheaper storage,
+4. volume allocation for each bucket
+*/
+
+type VolumeGrowOption struct {
+ Collection string
+ ReplicaPlacement *storage.ReplicaPlacement
+ DataCenter string
+ Rack string
+ DataNode string
+}
+
+type VolumeGrowth struct {
+ accessLock sync.Mutex
+}
+
+func NewDefaultVolumeGrowth() *VolumeGrowth {
+ return &VolumeGrowth{}
+}
+
+// one replication type may need rp.GetCopyCount() actual volumes
+// given copyCount, how many logical volumes to create
+func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
+ switch copyCount {
+ case 1:
+ count = 7
+ case 2:
+ count = 6
+ case 3:
+ count = 3
+ default:
+ count = 1
+ }
+ return
+}
+
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+ if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return count, nil
+ }
+ return count, err
+}
+func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+ vg.accessLock.Lock()
+ defer vg.accessLock.Unlock()
+
+ for i := 0; i < targetCount; i++ {
+ if c, e := vg.findAndGrow(topo, option); e == nil {
+ counter += c
+ } else {
+ return counter, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+ servers, e := vg.findEmptySlotsForOneVolume(topo, option)
+ if e != nil {
+ return 0, e
+ }
+ vid := topo.NextVolumeId()
+ err := vg.grow(topo, vid, option, servers...)
+ return len(servers), err
+}
+
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
+ //find main datacenter and other data centers
+ rp := option.ReplicaPlacement
+ mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
+ if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
+ return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
+ }
+ if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
+ }
+ return nil
+ })
+ if dc_err != nil {
+ return nil, dc_err
+ }
+
+ //find main rack and other racks
+ mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
+ if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
+ return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
+ }
+ if node.FreeSpace() < rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
+ }
+ return nil
+ })
+ if rack_err != nil {
+ return nil, rack_err
+ }
+
+ //find main rack and other racks
+ mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
+ if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
+ return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
+ }
+ if node.FreeSpace() < 1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
+ }
+ return nil
+ })
+ if server_err != nil {
+ return nil, server_err
+ }
+
+ servers = append(servers, mainServer.(*DataNode))
+ for _, server := range otherServers {
+ servers = append(servers, server.(*DataNode))
+ }
+ for _, rack := range otherRacks {
+ r := rand.Intn(rack.FreeSpace())
+ if server, e := rack.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ for _, datacenter := range otherDataCenters {
+ r := rand.Intn(datacenter.FreeSpace())
+ if server, e := datacenter.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+ for _, server := range servers {
+ if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ topo.RegisterVolumeLayout(vi, server)
+ glog.V(0).Infoln("Created Volume", vid, "on", server)
+ } else {
+ glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
+ return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error())
+ }
+ }
+ return nil
+}
diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go
new file mode 100644
index 000000000..7f6bd9489
--- /dev/null
+++ b/go/topology/volume_growth_test.go
@@ -0,0 +1,127 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/sequence"
+ "code.google.com/p/weed-fs/go/storage"
+ "encoding/json"
+ "fmt"
+ "testing"
+)
+
+var topologyLayout = `
+{
+ "dc1":{
+ "rack1":{
+ "server111":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server112":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server121":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server122":{
+ "volumes":[],
+ "limit":4
+ },
+ "server123":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":5
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server321":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
+}
+`
+
+func setup(topologyLayout string) *Topology {
+ var data interface{}
+ err := json.Unmarshal([]byte(topologyLayout), &data)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
+ fmt.Println("data:", data)
+
+ //need to connect all nodes first before server adding volumes
+ topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
+ sequence.NewMemorySequencer(), 32*1024, 5)
+ if err != nil {
+ panic("error: " + err.Error())
+ }
+ mTopology := data.(map[string]interface{})
+ for dcKey, dcValue := range mTopology {
+ dc := NewDataCenter(dcKey)
+ dcMap := dcValue.(map[string]interface{})
+ topo.LinkChildNode(dc)
+ for rackKey, rackValue := range dcMap {
+ rack := NewRack(rackKey)
+ rackMap := rackValue.(map[string]interface{})
+ dc.LinkChildNode(rack)
+ for serverKey, serverValue := range rackMap {
+ server := NewDataNode(serverKey)
+ serverMap := serverValue.(map[string]interface{})
+ rack.LinkChildNode(server)
+ for _, v := range serverMap["volumes"].([]interface{}) {
+ m := v.(map[string]interface{})
+ vi := storage.VolumeInfo{
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ }
+ server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ }
+ }
+ }
+
+ return topo
+}
+
+func TestFindEmptySlotsForOneVolume(t *testing.T) {
+ topo := setup(topologyLayout)
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := storage.NewReplicaPlacementFromString("002")
+ servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
+ t.Fail()
+ }
+ for _, server := range servers {
+ fmt.Println("assigned node :", server.Id())
+ }
+}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index a53e2ae82..bd95cc796 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -71,13 +71,13 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int, option *VolumeGrowOption) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- if dataCenter == "" {
+ if option.DataCenter == "" {
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
@@ -91,7 +91,13 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
if rand.Intn(counter) < 1 {
vid, locationList = v, volumeLocationList
@@ -104,14 +110,20 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!")
}
-func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int {
- if dataCenter == "" {
+func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
+ if option.DataCenter == "" {
return len(vl.writables)
}
counter := 0
for _, v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
}
}