diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/allocate_volume.go | 32 | ||||
| -rw-r--r-- | go/topology/configuration.go | 24 | ||||
| -rw-r--r-- | go/topology/data_node.go | 10 | ||||
| -rw-r--r-- | go/topology/store_replicate.go | 96 | ||||
| -rw-r--r-- | go/topology/topology.go | 9 | ||||
| -rw-r--r-- | go/topology/volume_growth.go | 162 | ||||
| -rw-r--r-- | go/topology/volume_growth_test.go | 127 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 24 |
8 files changed, 464 insertions, 20 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go new file mode 100644 index 000000000..77b4ac508 --- /dev/null +++ b/go/topology/allocate_volume.go @@ -0,0 +1,32 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "net/url" +) + +type AllocateVolumeResult struct { + Error string +} + +func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("collection", collection) + values.Add("replication", rp.String()) + jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/go/topology/configuration.go b/go/topology/configuration.go index 058600a7c..ffcebb59c 100644 --- a/go/topology/configuration.go +++ b/go/topology/configuration.go @@ -47,19 +47,19 @@ func (c *Configuration) String() string { } func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { - if dcName == "" { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - } else { - if rackName == "" { - return dcName, "DefaultRack" - } else { - return dcName, rackName + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName } } - return "DefaultDataCenter", "DefaultRack" + if dcName == "" { + dcName = "DefaultDataCenter" + } + + if rackName == "" { + rackName = "DefaultRack" + } + + return dcName, rackName } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index 0cedb5cfe..ae80e08bb 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -24,6 +24,7 @@ func NewDataNode(id string) *DataNode { s.NodeImpl.value = s return s } + func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v @@ -36,6 +37,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { dn.volumes[v.Id] = v } } + func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) for _, v := range actualVolumes { @@ -53,9 +55,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) { dn.AddOrUpdateVolume(v) } } + func (dn *DataNode) GetDataCenter() *DataCenter { return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) } + +func (dn *DataNode) GetRack() *Rack { + return dn.Parent().(*NodeImpl).value.(*Rack) +} + func (dn *DataNode) GetTopology() *Topology { p := dn.Parent() for p.Parent() != nil { @@ -64,9 +72,11 @@ func (dn *DataNode) GetTopology() *Topology { t := p.(*Topology) return t } + func (dn *DataNode) MatchLocation(ip string, port int) bool { return dn.Ip == ip && dn.Port == port } + func (dn *DataNode) Url() string { return dn.Ip + ":" + strconv.Itoa(dn.Port) } diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go new file mode 100644 index 000000000..a982cebe5 --- /dev/null +++ b/go/topology/store_replicate.go @@ -0,0 +1,96 @@ +package topology + +import ( + "bytes" + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/util" + "net/http" + "strconv" +) + +func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { + ret, err := s.Write(volumeId, needle) + needToReplicate := !s.HasVolume(volumeId) + if err != nil { + errorStatus = "Failed to write to local disk (" + err.Error() + ")" + } else if ret > 0 { + needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() + } else { + errorStatus = "Failed to write to local disk" + } + if !needToReplicate && ret > 0 { + needToReplicate = s.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime)) + return err == nil + }) { + ret = 0 + errorStatus = "Failed to write to replicas for volume " + volumeId.String() + } + } + } + if errorStatus != "" { + if _, err = s.Delete(volumeId, needle); err != nil { + errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + + volumeId.String() + ": " + err.Error() + } else { + distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") + }) + } + } + size = ret + return +} + +func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) { + ret, err := store.Delete(volumeId, n) + if err != nil { + glog.V(0).Infoln("delete error:", err) + return + } + + needToReplicate := !store.HasVolume(volumeId) + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "replicate" { + if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { + return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") + }) { + ret = 0 + } + } + } + return +} + +func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { + if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { + length := 0 + selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) + results := make(chan bool) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + length++ + go func(location operation.Location, results chan bool) { + results <- op(location) + }(location, results) + } + } + ret := true + for i := 0; i < length; i++ { + ret = ret && <-results + } + return ret + } else { + glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) + } + return false +} diff --git a/go/topology/topology.go b/go/topology/topology.go index 6c5bde304..b1fa3f2a2 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -108,8 +108,13 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return next } -func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter) +func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option) if err != nil || datanodes.Length() == 0 { return "", 0, nil, errors.New("No writable volumes avalable!") } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go new file mode 100644 index 000000000..ee6233364 --- /dev/null +++ b/go/topology/volume_growth.go @@ -0,0 +1,162 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/storage" + "fmt" + "math/rand" + "sync" +) + +/* +This package is created to resolve these replica placement issues: +1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies +2. in time of tight storage, how to reduce replica level +3. optimizing for hot data on faster disk, cold data on cheaper storage, +4. volume allocation for each bucket +*/ + +type VolumeGrowOption struct { + Collection string + ReplicaPlacement *storage.ReplicaPlacement + DataCenter string + Rack string + DataNode string +} + +type VolumeGrowth struct { + accessLock sync.Mutex +} + +func NewDefaultVolumeGrowth() *VolumeGrowth { + return &VolumeGrowth{} +} + +// one replication type may need rp.GetCopyCount() actual volumes +// given copyCount, how many logical volumes to create +func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { + switch copyCount { + case 1: + count = 7 + case 2: + count = 6 + case 3: + count = 3 + default: + count = 1 + } + return +} + +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { + count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) + if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { + return count, nil + } + return count, err +} +func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { + vg.accessLock.Lock() + defer vg.accessLock.Unlock() + + for i := 0; i < targetCount; i++ { + if c, e := vg.findAndGrow(topo, option); e == nil { + counter += c + } else { + return counter, e + } + } + return +} + +func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { + servers, e := vg.findEmptySlotsForOneVolume(topo, option) + if e != nil { + return 0, e + } + vid := topo.NextVolumeId() + err := vg.grow(topo, vid, option, servers...) + return len(servers), err +} + +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { + //find main datacenter and other data centers + rp := option.ReplicaPlacement + mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { + return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) + } + if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + } + return nil + }) + if dc_err != nil { + return nil, dc_err + } + + //find main rack and other racks + mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { + return fmt.Errorf("Not matching preferred rack:%s", option.Rack) + } + if node.FreeSpace() < rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + } + return nil + }) + if rack_err != nil { + return nil, rack_err + } + + //find main rack and other racks + mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { + return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) + } + if node.FreeSpace() < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + } + return nil + }) + if server_err != nil { + return nil, server_err + } + + servers = append(servers, mainServer.(*DataNode)) + for _, server := range otherServers { + servers = append(servers, server.(*DataNode)) + } + for _, rack := range otherRacks { + r := rand.Intn(rack.FreeSpace()) + if server, e := rack.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e + } + } + for _, datacenter := range otherDataCenters { + r := rand.Intn(datacenter.FreeSpace()) + if server, e := datacenter.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e + } + } + return +} + +func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { + for _, server := range servers { + if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil { + vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + topo.RegisterVolumeLayout(vi, server) + glog.V(0).Infoln("Created Volume", vid, "on", server) + } else { + glog.V(0).Infoln("Failed to assign", vid, "to", servers, "error", err) + return fmt.Errorf("Failed to assign %s: %s", vid.String(), err.Error()) + } + } + return nil +} diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go new file mode 100644 index 000000000..7f6bd9489 --- /dev/null +++ b/go/topology/volume_growth_test.go @@ -0,0 +1,127 @@ +package topology + +import ( + "code.google.com/p/weed-fs/go/sequence" + "code.google.com/p/weed-fs/go/storage" + "encoding/json" + "fmt" + "testing" +) + +var topologyLayout = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server112":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server121":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server122":{ + "volumes":[], + "limit":4 + }, + "server123":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":5 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server321":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } +} +` + +func setup(topologyLayout string) *Topology { + var data interface{} + err := json.Unmarshal([]byte(topologyLayout), &data) + if err != nil { + fmt.Println("error:", err) + } + fmt.Println("data:", data) + + //need to connect all nodes first before server adding volumes + topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", + sequence.NewMemorySequencer(), 32*1024, 5) + if err != nil { + panic("error: " + err.Error()) + } + mTopology := data.(map[string]interface{}) + for dcKey, dcValue := range mTopology { + dc := NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + rack := NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(rack) + for serverKey, serverValue := range rackMap { + server := NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + rack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{ + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + } + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + } + } + } + + return topo +} + +func TestFindEmptySlotsForOneVolume(t *testing.T) { + topo := setup(topologyLayout) + vg := NewDefaultVolumeGrowth() + rp, _ := storage.NewReplicaPlacementFromString("002") + servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) + } +} diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index a53e2ae82..bd95cc796 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -71,13 +71,13 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { return } -func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) { +func (vl *VolumeLayout) PickForWrite(count int, option *VolumeGrowOption) (*storage.VolumeId, int, *VolumeLocationList, error) { len_writers := len(vl.writables) if len_writers <= 0 { glog.V(0).Infoln("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - if dataCenter == "" { + if option.DataCenter == "" { vid := vl.writables[rand.Intn(len_writers)] locationList := vl.vid2location[vid] if locationList != nil { @@ -91,7 +91,13 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol for _, v := range vl.writables { volumeLocationList := vl.vid2location[v] for _, dn := range volumeLocationList.list { - if dn.GetDataCenter().Id() == NodeId(dataCenter) { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } counter++ if rand.Intn(counter) < 1 { vid, locationList = v, volumeLocationList @@ -104,14 +110,20 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!") } -func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int { - if dataCenter == "" { +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + if option.DataCenter == "" { return len(vl.writables) } counter := 0 for _, v := range vl.writables { for _, dn := range vl.vid2location[v].list { - if dn.GetDataCenter().Id() == NodeId(dataCenter) { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } counter++ } } |
