aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/allocate_volume.go32
-rw-r--r--go/topology/configuration.go24
-rw-r--r--go/topology/data_node.go10
-rw-r--r--go/topology/store_replicate.go96
-rw-r--r--go/topology/topology.go9
-rw-r--r--go/topology/volume_growth.go162
-rw-r--r--go/topology/volume_growth_test.go127
-rw-r--r--go/topology/volume_layout.go24
8 files changed, 464 insertions, 20 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
new file mode 100644
index 000000000..77b4ac508
--- /dev/null
+++ b/go/topology/allocate_volume.go
@@ -0,0 +1,32 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
+ "encoding/json"
+ "errors"
+ "net/url"
+)
+
+type AllocateVolumeResult struct {
+ Error string
+}
+
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("collection", collection)
+ values.Add("replication", rp.String())
+ jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
+ if err != nil {
+ return err
+ }
+ var ret AllocateVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/go/topology/configuration.go b/go/topology/configuration.go
index 058600a7c..ffcebb59c 100644
--- a/go/topology/configuration.go
+++ b/go/topology/configuration.go
@@ -47,19 +47,19 @@ func (c *Configuration) String() string {
}
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
- if dcName == "" {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
- } else {
- if rackName == "" {
- return dcName, "DefaultRack"
- } else {
- return dcName, rackName
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
}
}
- return "DefaultDataCenter", "DefaultRack"
+ if dcName == "" {
+ dcName = "DefaultDataCenter"
+ }
+
+ if rackName == "" {
+ rackName = "DefaultRack"
+ }
+
+ return dcName, rackName
}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 0cedb5cfe..ae80e08bb 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -24,6 +24,7 @@ func NewDataNode(id string) *DataNode {
s.NodeImpl.value = s
return s
}
+
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
@@ -36,6 +37,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.volumes[v.Id] = v
}
}
+
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
@@ -53,9 +55,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
dn.AddOrUpdateVolume(v)
}
}
+
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
}
+
+func (dn *DataNode) GetRack() *Rack {
+ return dn.Parent().(*NodeImpl).value.(*Rack)
+}
+
func (dn *DataNode) GetTopology() *Topology {
p := dn.Parent()
for p.Parent() != nil {
@@ -64,9 +72,11 @@ func (dn *DataNode) GetTopology() *Topology {
t := p.(*Topology)
return t
}
+
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}
+
func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
}
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
new file mode 100644
index 000000000..a982cebe5
--- /dev/null
+++ b/go/topology/store_replicate.go
@@ -0,0 +1,96 @@
+package topology
+
+import (
+ "bytes"
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/operation"
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/util"
+ "net/http"
+ "strconv"
+)
+
+func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
+ ret, err := s.Write(volumeId, needle)
+ needToReplicate := !s.HasVolume(volumeId)
+ if err != nil {
+ errorStatus = "Failed to write to local disk (" + err.Error() + ")"
+ } else if ret > 0 {
+ needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
+ } else {
+ errorStatus = "Failed to write to local disk"
+ }
+ if !needToReplicate && ret > 0 {
+ needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+ if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
+ _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
+ return err == nil
+ }) {
+ ret = 0
+ errorStatus = "Failed to write to replicas for volume " + volumeId.String()
+ }
+ }
+ }
+ if errorStatus != "" {
+ if _, err = s.Delete(volumeId, needle); err != nil {
+ errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " +
+ volumeId.String() + ": " + err.Error()
+ } else {
+ distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ })
+ }
+ }
+ size = ret
+ return
+}
+
+func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
+ ret, err := store.Delete(volumeId, n)
+ if err != nil {
+ glog.V(0).Infoln("delete error:", err)
+ return
+ }
+
+ needToReplicate := !store.HasVolume(volumeId)
+ if !needToReplicate && ret > 0 {
+ needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+ if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
+ return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
+ }) {
+ ret = 0
+ }
+ }
+ }
+ return
+}
+
+func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
+ length := 0
+ selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
+ results := make(chan bool)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ length++
+ go func(location operation.Location, results chan bool) {
+ results <- op(location)
+ }(location, results)
+ }
+ }
+ ret := true
+ for i := 0; i < length; i++ {
+ ret = ret && <-results
+ }
+ return ret
+ } else {
+ glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error())
+ }
+ return false
+}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 6c5bde304..b1fa3f2a2 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -108,8 +108,13 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return next
}
-func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter)
+func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ return vl.GetActiveVolumeCount(option) > 0
+}
+
+func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
new file mode 100644
index 000000000..ee6233364
--- /dev/null
+++ b/go/topology/volume_growth.go
@@ -0,0 +1,162 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/storage"
+ "fmt"
+ "math/rand"
+ "sync"
+)
+
+/*
+This package is created to resolve these replica placement issues:
+1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
+2. in time of tight storage, how to reduce replica level
+3. optimizing for hot data on faster disk, cold data on cheaper storage,
+4. volume allocation for each bucket
+*/
+
+type VolumeGrowOption struct {
+ Collection string
+ ReplicaPlacement *storage.ReplicaPlacement
+ DataCenter string
+ Rack string
+ DataNode string
+}
+
+type VolumeGrowth struct {
+ accessLock sync.Mutex
+}
+
+func NewDefaultVolumeGrowth() *VolumeGrowth {
+ return &VolumeGrowth{}
+}
+
+// one replication type may need rp.GetCopyCount() actual volumes
+// given copyCount, how many logical volumes to create
+func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
+ switch copyCount {
+ case 1:
+ count = 7
+ case 2:
+ count = 6
+ case 3:
+ count = 3
+ default:
+ count = 1
+ }
+ return
+}
+
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+ if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return count, nil
+ }
+ return count, err
+}
+func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+ vg.accessLock.Lock()
+ defer vg.accessLock.Unlock()
+
+ for i := 0; i < targetCount; i++ {
+ if c, e := vg.findAndGrow(topo, option); e == nil {
+ counter += c
+ } else {
+ return counter, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+ servers, e := vg.findEmptySlotsForOneVolume(topo, option)
+ if e != nil {
+ return 0, e
+ }
+ vid := topo.NextVolumeId()
+ err := vg.grow(topo, vid, option, servers...)
+ return len(servers), err
+}
+
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
+ //find main datacenter and other data centers
+ rp := option.ReplicaPlacement
+ mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
+ if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
+ return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
+ }
+ if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
+ }
+ return nil
+ })
+ if dc_err != nil {
+ return nil, dc_err
+ }
+
+ //find main rack and other racks
+ mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
+ if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
+ return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
+ }
+ if node.FreeSpace() < rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
+ }
+ return nil
+ })
+ if rack_err != nil {
+ return nil, rack_err
+ }
+
+ //find main rack and other racks
+ mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
+ if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
+ return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
+ }
+ if node.FreeSpace() < 1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
+ }
+ return nil
+ })
+ if server_err != nil {
+ return nil, server_err
+ }
+
+ servers = append(servers, mainServer.(*DataNode))
+ for _, server := range otherServers {
+ servers = append(servers, server.(*DataNode))
+ }
+ for _, rack := range otherRacks {
+ r := rand.Intn(rack.FreeSpace())
+ if server, e := rack.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ for _, datacenter := range otherDataCenters {
+ r := rand.Intn(datacenter.FreeSpace())
+ if server, e := datacenter.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+ for _, server := range servers {
+ if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ topo.RegisterVolumeLayout(vi, server)
+ glog.V(0).Infoln("Created Volume", vid, "on", server)
+ } else {
+ glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err)
+ return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error())
+ }
+ }
+ return nil
+}
diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go
new file mode 100644
index 000000000..7f6bd9489
--- /dev/null
+++ b/go/topology/volume_growth_test.go
@@ -0,0 +1,127 @@
+package topology
+
+import (
+ "code.google.com/p/weed-fs/go/sequence"
+ "code.google.com/p/weed-fs/go/storage"
+ "encoding/json"
+ "fmt"
+ "testing"
+)
+
+var topologyLayout = `
+{
+ "dc1":{
+ "rack1":{
+ "server111":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server112":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server121":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server122":{
+ "volumes":[],
+ "limit":4
+ },
+ "server123":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":5
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server321":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
+}
+`
+
+func setup(topologyLayout string) *Topology {
+ var data interface{}
+ err := json.Unmarshal([]byte(topologyLayout), &data)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
+ fmt.Println("data:", data)
+
+ //need to connect all nodes first before server adding volumes
+ topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
+ sequence.NewMemorySequencer(), 32*1024, 5)
+ if err != nil {
+ panic("error: " + err.Error())
+ }
+ mTopology := data.(map[string]interface{})
+ for dcKey, dcValue := range mTopology {
+ dc := NewDataCenter(dcKey)
+ dcMap := dcValue.(map[string]interface{})
+ topo.LinkChildNode(dc)
+ for rackKey, rackValue := range dcMap {
+ rack := NewRack(rackKey)
+ rackMap := rackValue.(map[string]interface{})
+ dc.LinkChildNode(rack)
+ for serverKey, serverValue := range rackMap {
+ server := NewDataNode(serverKey)
+ serverMap := serverValue.(map[string]interface{})
+ rack.LinkChildNode(server)
+ for _, v := range serverMap["volumes"].([]interface{}) {
+ m := v.(map[string]interface{})
+ vi := storage.VolumeInfo{
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ }
+ server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ }
+ }
+ }
+
+ return topo
+}
+
+func TestFindEmptySlotsForOneVolume(t *testing.T) {
+ topo := setup(topologyLayout)
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := storage.NewReplicaPlacementFromString("002")
+ servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
+ t.Fail()
+ }
+ for _, server := range servers {
+ fmt.Println("assigned node :", server.Id())
+ }
+}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index a53e2ae82..bd95cc796 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -71,13 +71,13 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int, option *VolumeGrowOption) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- if dataCenter == "" {
+ if option.DataCenter == "" {
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
@@ -91,7 +91,13 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
if rand.Intn(counter) < 1 {
vid, locationList = v, volumeLocationList
@@ -104,14 +110,20 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!")
}
-func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int {
- if dataCenter == "" {
+func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
+ if option.DataCenter == "" {
return len(vl.writables)
}
counter := 0
for _, v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
}
}