aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/allocate_volume.go35
-rw-r--r--go/topology/cluster_commands.go31
-rw-r--r--go/topology/collection.go57
-rw-r--r--go/topology/configuration.go65
-rw-r--r--go/topology/configuration_test.go42
-rw-r--r--go/topology/data_center.go40
-rw-r--r--go/topology/data_node.go115
-rw-r--r--go/topology/node.go272
-rw-r--r--go/topology/rack.go65
-rw-r--r--go/topology/store_replicate.go150
-rw-r--r--go/topology/topo_test.go17
-rw-r--r--go/topology/topology.go189
-rw-r--r--go/topology/topology_event_handling.go74
-rw-r--r--go/topology/topology_map.go53
-rw-r--r--go/topology/topology_vacuum.go158
-rw-r--r--go/topology/volume_growth.go211
-rw-r--r--go/topology/volume_growth_test.go135
-rw-r--r--go/topology/volume_layout.go226
-rw-r--r--go/topology/volume_location_list.go65
19 files changed, 0 insertions, 2000 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
deleted file mode 100644
index f014c3527..000000000
--- a/go/topology/allocate_volume.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "errors"
- "fmt"
- "net/url"
-
- "github.com/chrislusf/seaweedfs/go/storage"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type AllocateVolumeResult struct {
- Error string
-}
-
-func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("collection", option.Collection)
- values.Add("replication", option.ReplicaPlacement.String())
- values.Add("ttl", option.Ttl.String())
- jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
- if err != nil {
- return err
- }
- var ret AllocateVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob))
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go
deleted file mode 100644
index eac93c13c..000000000
--- a/go/topology/cluster_commands.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package topology
-
-import (
- "github.com/chrislusf/raft"
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-type MaxVolumeIdCommand struct {
- MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
-}
-
-func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
- return &MaxVolumeIdCommand{
- MaxVolumeId: value,
- }
-}
-
-func (c *MaxVolumeIdCommand) CommandName() string {
- return "MaxVolumeId"
-}
-
-func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
- topo := server.Context().(*Topology)
- before := topo.GetMaxVolumeId()
- topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
-
- glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
-
- return nil, nil
-}
diff --git a/go/topology/collection.go b/go/topology/collection.go
deleted file mode 100644
index 6368900c3..000000000
--- a/go/topology/collection.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package topology
-
-import (
- "fmt"
-
- "github.com/chrislusf/seaweedfs/go/storage"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type Collection struct {
- Name string
- volumeSizeLimit uint64
- storageType2VolumeLayout *util.ConcurrentReadMap
-}
-
-func NewCollection(name string, volumeSizeLimit uint64) *Collection {
- c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.storageType2VolumeLayout = util.NewConcurrentReadMap()
- return c
-}
-
-func (c *Collection) String() string {
- return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
-}
-
-func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
- keyString := rp.String()
- if ttl != nil {
- keyString += ttl.String()
- }
- vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
- return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
- })
- return vl.(*VolumeLayout)
-}
-
-func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.storageType2VolumeLayout.Items() {
- if vl != nil {
- if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
- return list
- }
- }
- }
- return nil
-}
-
-func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.storageType2VolumeLayout.Items() {
- if vl != nil {
- if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
- nodes = append(nodes, list...)
- }
- }
- }
- return
-}
diff --git a/go/topology/configuration.go b/go/topology/configuration.go
deleted file mode 100644
index ffcebb59c..000000000
--- a/go/topology/configuration.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package topology
-
-import (
- "encoding/xml"
-)
-
-type loc struct {
- dcName string
- rackName string
-}
-type rack struct {
- Name string `xml:"name,attr"`
- Ips []string `xml:"Ip"`
-}
-type dataCenter struct {
- Name string `xml:"name,attr"`
- Racks []rack `xml:"Rack"`
-}
-type topology struct {
- DataCenters []dataCenter `xml:"DataCenter"`
-}
-type Configuration struct {
- XMLName xml.Name `xml:"Configuration"`
- Topo topology `xml:"Topology"`
- ip2location map[string]loc
-}
-
-func NewConfiguration(b []byte) (*Configuration, error) {
- c := &Configuration{}
- err := xml.Unmarshal(b, c)
- c.ip2location = make(map[string]loc)
- for _, dc := range c.Topo.DataCenters {
- for _, rack := range dc.Racks {
- for _, ip := range rack.Ips {
- c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
- }
- }
- }
- return c, err
-}
-
-func (c *Configuration) String() string {
- if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
- return string(b)
- }
- return ""
-}
-
-func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
-
- if dcName == "" {
- dcName = "DefaultDataCenter"
- }
-
- if rackName == "" {
- rackName = "DefaultRack"
- }
-
- return dcName, rackName
-}
diff --git a/go/topology/configuration_test.go b/go/topology/configuration_test.go
deleted file mode 100644
index 0a353d16e..000000000
--- a/go/topology/configuration_test.go
+++ /dev/null
@@ -1,42 +0,0 @@
-package topology
-
-import (
- "fmt"
- "testing"
-)
-
-func TestLoadConfiguration(t *testing.T) {
-
- confContent := `
-
-<?xml version="1.0" encoding="UTF-8" ?>
-<Configuration>
- <Topology>
- <DataCenter name="dc1">
- <Rack name="rack1">
- <Ip>192.168.1.1</Ip>
- </Rack>
- </DataCenter>
- <DataCenter name="dc2">
- <Rack name="rack1">
- <Ip>192.168.1.2</Ip>
- </Rack>
- <Rack name="rack2">
- <Ip>192.168.1.3</Ip>
- <Ip>192.168.1.4</Ip>
- </Rack>
- </DataCenter>
- </Topology>
-</Configuration>
-`
- c, err := NewConfiguration([]byte(confContent))
-
- fmt.Printf("%s\n", c)
- if err != nil {
- t.Fatalf("unmarshal error:%v", err)
- }
-
- if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
- t.Fatalf("unmarshal error:%s", c)
- }
-}
diff --git a/go/topology/data_center.go b/go/topology/data_center.go
deleted file mode 100644
index bcf2dfd31..000000000
--- a/go/topology/data_center.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package topology
-
-type DataCenter struct {
- NodeImpl
-}
-
-func NewDataCenter(id string) *DataCenter {
- dc := &DataCenter{}
- dc.id = NodeId(id)
- dc.nodeType = "DataCenter"
- dc.children = make(map[NodeId]Node)
- dc.NodeImpl.value = dc
- return dc
-}
-
-func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- if string(rack.Id()) == rackName {
- return rack
- }
- }
- rack := NewRack(rackName)
- dc.LinkChildNode(rack)
- return rack
-}
-
-func (dc *DataCenter) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Id"] = dc.Id()
- m["Max"] = dc.GetMaxVolumeCount()
- m["Free"] = dc.FreeSpace()
- var racks []interface{}
- for _, c := range dc.Children() {
- rack := c.(*Rack)
- racks = append(racks, rack.ToMap())
- }
- m["Racks"] = racks
- return m
-}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
deleted file mode 100644
index 3bad8c188..000000000
--- a/go/topology/data_node.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package topology
-
-import (
- "fmt"
- "strconv"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-type DataNode struct {
- NodeImpl
- volumes map[storage.VolumeId]storage.VolumeInfo
- Ip string
- Port int
- PublicUrl string
- LastSeen int64 // unix time in seconds
- Dead bool
-}
-
-func NewDataNode(id string) *DataNode {
- s := &DataNode{}
- s.id = NodeId(id)
- s.nodeType = "DataNode"
- s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
- s.NodeImpl.value = s
- return s
-}
-
-func (dn *DataNode) String() string {
- dn.RLock()
- defer dn.RUnlock()
- return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
-}
-
-func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
- dn.Lock()
- defer dn.Unlock()
- if _, ok := dn.volumes[v.Id]; !ok {
- dn.volumes[v.Id] = v
- dn.UpAdjustVolumeCountDelta(1)
- if !v.ReadOnly {
- dn.UpAdjustActiveVolumeCountDelta(1)
- }
- dn.UpAdjustMaxVolumeId(v.Id)
- } else {
- dn.volumes[v.Id] = v
- }
-}
-
-func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
- actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
- for _, v := range actualVolumes {
- actualVolumeMap[v.Id] = v
- }
- dn.RLock()
- for vid, v := range dn.volumes {
- if _, ok := actualVolumeMap[vid]; !ok {
- glog.V(0).Infoln("Deleting volume id:", vid)
- delete(dn.volumes, vid)
- deletedVolumes = append(deletedVolumes, v)
- dn.UpAdjustVolumeCountDelta(-1)
- dn.UpAdjustActiveVolumeCountDelta(-1)
- }
- } //TODO: adjust max volume id, if need to reclaim volume ids
- dn.RUnlock()
- for _, v := range actualVolumes {
- dn.AddOrUpdateVolume(v)
- }
- return
-}
-
-func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
- dn.RLock()
- for _, v := range dn.volumes {
- ret = append(ret, v)
- }
- dn.RUnlock()
- return ret
-}
-
-func (dn *DataNode) GetDataCenter() *DataCenter {
- return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
-}
-
-func (dn *DataNode) GetRack() *Rack {
- return dn.Parent().(*NodeImpl).value.(*Rack)
-}
-
-func (dn *DataNode) GetTopology() *Topology {
- p := dn.Parent()
- for p.Parent() != nil {
- p = p.Parent()
- }
- t := p.(*Topology)
- return t
-}
-
-func (dn *DataNode) MatchLocation(ip string, port int) bool {
- return dn.Ip == ip && dn.Port == port
-}
-
-func (dn *DataNode) Url() string {
- return dn.Ip + ":" + strconv.Itoa(dn.Port)
-}
-
-func (dn *DataNode) ToMap() interface{} {
- ret := make(map[string]interface{})
- ret["Url"] = dn.Url()
- ret["Volumes"] = dn.GetVolumeCount()
- ret["Max"] = dn.GetMaxVolumeCount()
- ret["Free"] = dn.FreeSpace()
- ret["PublicUrl"] = dn.PublicUrl
- return ret
-}
diff --git a/go/topology/node.go b/go/topology/node.go
deleted file mode 100644
index 6a84b4b92..000000000
--- a/go/topology/node.go
+++ /dev/null
@@ -1,272 +0,0 @@
-package topology
-
-import (
- "errors"
- "math/rand"
- "strings"
- "sync"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-type NodeId string
-type Node interface {
- Id() NodeId
- String() string
- FreeSpace() int
- ReserveOneVolume(r int) (*DataNode, error)
- UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
- UpAdjustVolumeCountDelta(volumeCountDelta int)
- UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
- UpAdjustMaxVolumeId(vid storage.VolumeId)
-
- GetVolumeCount() int
- GetActiveVolumeCount() int
- GetMaxVolumeCount() int
- GetMaxVolumeId() storage.VolumeId
- SetParent(Node)
- LinkChildNode(node Node)
- UnlinkChildNode(nodeId NodeId)
- CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
-
- IsDataNode() bool
- IsRack() bool
- IsDataCenter() bool
- Children() []Node
- Parent() Node
-
- GetValue() interface{} //get reference to the topology,dc,rack,datanode
-}
-type NodeImpl struct {
- id NodeId
- volumeCount int
- activeVolumeCount int
- maxVolumeCount int
- parent Node
- sync.RWMutex // lock children
- children map[NodeId]Node
- maxVolumeId storage.VolumeId
-
- //for rack, data center, topology
- nodeType string
- value interface{}
-}
-
-// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
-func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
- candidates := make([]Node, 0, len(n.children))
- var errs []string
- n.RLock()
- for _, node := range n.children {
- if err := filterFirstNodeFn(node); err == nil {
- candidates = append(candidates, node)
- } else {
- errs = append(errs, string(node.Id())+":"+err.Error())
- }
- }
- n.RUnlock()
- if len(candidates) == 0 {
- return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
- }
- firstNode = candidates[rand.Intn(len(candidates))]
- glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
-
- restNodes = make([]Node, numberOfNodes-1)
- candidates = candidates[:0]
- n.RLock()
- for _, node := range n.children {
- if node.Id() == firstNode.Id() {
- continue
- }
- if node.FreeSpace() <= 0 {
- continue
- }
- glog.V(2).Infoln("select rest node candidate:", node.Id())
- candidates = append(candidates, node)
- }
- n.RUnlock()
- glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- ret := len(restNodes) == 0
- for k, node := range candidates {
- if k < len(restNodes) {
- restNodes[k] = node
- if k == len(restNodes)-1 {
- ret = true
- }
- } else {
- r := rand.Intn(k + 1)
- if r < len(restNodes) {
- restNodes[r] = node
- }
- }
- }
- if !ret {
- glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
- err = errors.New("Not enough data node found!")
- }
- return
-}
-
-func (n *NodeImpl) IsDataNode() bool {
- return n.nodeType == "DataNode"
-}
-func (n *NodeImpl) IsRack() bool {
- return n.nodeType == "Rack"
-}
-func (n *NodeImpl) IsDataCenter() bool {
- return n.nodeType == "DataCenter"
-}
-func (n *NodeImpl) String() string {
- if n.parent != nil {
- return n.parent.String() + ":" + string(n.id)
- }
- return string(n.id)
-}
-func (n *NodeImpl) Id() NodeId {
- return n.id
-}
-func (n *NodeImpl) FreeSpace() int {
- return n.maxVolumeCount - n.volumeCount
-}
-func (n *NodeImpl) SetParent(node Node) {
- n.parent = node
-}
-func (n *NodeImpl) Children() (ret []Node) {
- n.RLock()
- defer n.RUnlock()
- for _, c := range n.children {
- ret = append(ret, c)
- }
- return ret
-}
-func (n *NodeImpl) Parent() Node {
- return n.parent
-}
-func (n *NodeImpl) GetValue() interface{} {
- return n.value
-}
-func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
- n.RLock()
- defer n.RUnlock()
- for _, node := range n.children {
- freeSpace := node.FreeSpace()
- // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
- if freeSpace <= 0 {
- continue
- }
- if r >= freeSpace {
- r -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return node.(*DataNode), nil
- }
- assignedNode, err = node.ReserveOneVolume(r)
- if err != nil {
- return
- }
- }
- }
- return
-}
-
-func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
- n.maxVolumeCount += maxVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
- n.volumeCount += volumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
- n.activeVolumeCount += activeVolumeCountDelta
- if n.parent != nil {
- n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
- }
-}
-func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
- if n.maxVolumeId < vid {
- n.maxVolumeId = vid
- if n.parent != nil {
- n.parent.UpAdjustMaxVolumeId(vid)
- }
- }
-}
-func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
- return n.maxVolumeId
-}
-func (n *NodeImpl) GetVolumeCount() int {
- return n.volumeCount
-}
-func (n *NodeImpl) GetActiveVolumeCount() int {
- return n.activeVolumeCount
-}
-func (n *NodeImpl) GetMaxVolumeCount() int {
- return n.maxVolumeCount
-}
-
-func (n *NodeImpl) LinkChildNode(node Node) {
- n.Lock()
- defer n.Unlock()
- if n.children[node.Id()] == nil {
- n.children[node.Id()] = node
- n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
- n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
- n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
- node.SetParent(n)
- glog.V(0).Infoln(n, "adds child", node.Id())
- }
-}
-
-func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
- n.Lock()
- defer n.Unlock()
- node := n.children[nodeId]
- if node != nil {
- node.SetParent(nil)
- delete(n.children, node.Id())
- n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
- n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
- n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
- glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount)
- }
-}
-
-func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
- if n.IsRack() {
- for _, c := range n.Children() {
- dn := c.(*DataNode) //can not cast n to DataNode
- if dn.LastSeen < freshThreshHold {
- if !dn.Dead {
- dn.Dead = true
- n.GetTopology().chanDeadDataNodes <- dn
- }
- }
- for _, v := range dn.GetVolumes() {
- if uint64(v.Size) >= volumeSizeLimit {
- //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- v
- }
- }
- }
- } else {
- for _, c := range n.Children() {
- c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
- }
- }
-}
-
-func (n *NodeImpl) GetTopology() *Topology {
- var p Node
- p = n
- for p.Parent() != nil {
- p = p.Parent()
- }
- return p.GetValue().(*Topology)
-}
diff --git a/go/topology/rack.go b/go/topology/rack.go
deleted file mode 100644
index 1ca2f8de8..000000000
--- a/go/topology/rack.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package topology
-
-import (
- "strconv"
- "time"
-)
-
-type Rack struct {
- NodeImpl
-}
-
-func NewRack(id string) *Rack {
- r := &Rack{}
- r.id = NodeId(id)
- r.nodeType = "Rack"
- r.children = make(map[NodeId]Node)
- r.NodeImpl.value = r
- return r
-}
-
-func (r *Rack) FindDataNode(ip string, port int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- return dn
- }
- }
- return nil
-}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- if dn.MatchLocation(ip, port) {
- dn.LastSeen = time.Now().Unix()
- if dn.Dead {
- dn.Dead = false
- r.GetTopology().chanRecoveredDataNodes <- dn
- dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
- }
- return dn
- }
- }
- dn := NewDataNode(ip + ":" + strconv.Itoa(port))
- dn.Ip = ip
- dn.Port = port
- dn.PublicUrl = publicUrl
- dn.maxVolumeCount = maxVolumeCount
- dn.LastSeen = time.Now().Unix()
- r.LinkChildNode(dn)
- return dn
-}
-
-func (r *Rack) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Id"] = r.Id()
- m["Max"] = r.GetMaxVolumeCount()
- m["Free"] = r.FreeSpace()
- var dns []interface{}
- for _, c := range r.Children() {
- dn := c.(*DataNode)
- dns = append(dns, dn.ToMap())
- }
- m["DataNodes"] = dns
- return m
-}
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
deleted file mode 100644
index f67ea7732..000000000
--- a/go/topology/store_replicate.go
+++ /dev/null
@@ -1,150 +0,0 @@
-package topology
-
-import (
- "bytes"
- "errors"
- "fmt"
- "net/http"
- "strconv"
- "strings"
-
- "net/url"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/operation"
- "github.com/chrislusf/seaweedfs/go/security"
- "github.com/chrislusf/seaweedfs/go/storage"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-func ReplicatedWrite(masterNode string, s *storage.Store,
- volumeId storage.VolumeId, needle *storage.Needle,
- r *http.Request) (size uint32, errorStatus string) {
-
- //check JWT
- jwt := security.GetJwt(r)
-
- ret, err := s.Write(volumeId, needle)
- needToReplicate := !s.HasVolume(volumeId)
- if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- } else if ret > 0 {
- needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
- } else {
- errorStatus = "Failed to write to local disk"
- }
- if !needToReplicate && ret > 0 {
- needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
-
- if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
- u := url.URL{
- Scheme: "http",
- Host: location.Url,
- Path: r.URL.Path,
- }
- q := url.Values{
- "type": {"replicate"},
- }
- if needle.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
- }
- if needle.IsChunkedManifest() {
- q.Set("cm", "true")
- }
- u.RawQuery = q.Encode()
- _, err := operation.Upload(u.String(),
- string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
- jwt)
- return err
- }); err != nil {
- ret = 0
- errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
- }
- }
- }
- size = ret
- return
-}
-
-func ReplicatedDelete(masterNode string, store *storage.Store,
- volumeId storage.VolumeId, n *storage.Needle,
- r *http.Request) (uint32, error) {
-
- //check JWT
- jwt := security.GetJwt(r)
-
- ret, err := store.Delete(volumeId, n)
- if err != nil {
- glog.V(0).Infoln("delete error:", err)
- return ret, err
- }
-
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
- }); err != nil {
- ret = 0
- }
- }
- }
- return ret, err
-}
-
-type DistributedOperationResult map[string]error
-
-func (dr DistributedOperationResult) Error() error {
- var errs []string
- for k, v := range dr {
- if v != nil {
- errs = append(errs, fmt.Sprintf("[%s]: %v", k, v))
- }
- }
- if len(errs) == 0 {
- return nil
- }
- return errors.New(strings.Join(errs, "\n"))
-}
-
-type RemoteResult struct {
- Host string
- Error error
-}
-
-func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
- length := 0
- selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
- results := make(chan RemoteResult)
- for _, location := range lookupResult.Locations {
- if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan RemoteResult) {
- results <- RemoteResult{location.Url, op(location)}
- }(location, results)
- }
- }
- ret := DistributedOperationResult(make(map[string]error))
- for i := 0; i < length; i++ {
- result := <-results
- ret[result.Host] = result.Error
- }
- if volume := store.GetVolume(volumeId); volume != nil {
- if length+1 < volume.ReplicaPlacement.GetCopyCount() {
- return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
- }
- }
- return ret.Error()
- } else {
- glog.V(0).Infoln()
- return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
- }
- return nil
-}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
deleted file mode 100644
index 9a0dbc6b8..000000000
--- a/go/topology/topo_test.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package topology
-
-import (
- "testing"
-)
-
-func TestRemoveDataCenter(t *testing.T) {
- topo := setup(topologyLayout)
- topo.UnlinkChildNode(NodeId("dc2"))
- if topo.GetActiveVolumeCount() != 15 {
- t.Fail()
- }
- topo.UnlinkChildNode(NodeId("dc3"))
- if topo.GetActiveVolumeCount() != 12 {
- t.Fail()
- }
-}
diff --git a/go/topology/topology.go b/go/topology/topology.go
deleted file mode 100644
index 088639eef..000000000
--- a/go/topology/topology.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package topology
-
-import (
- "errors"
- "io/ioutil"
- "math/rand"
-
- "github.com/chrislusf/raft"
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/operation"
- "github.com/chrislusf/seaweedfs/go/sequence"
- "github.com/chrislusf/seaweedfs/go/storage"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-type Topology struct {
- NodeImpl
-
- collectionMap *util.ConcurrentReadMap
-
- pulse int64
-
- volumeSizeLimit uint64
-
- Sequence sequence.Sequencer
-
- chanDeadDataNodes chan *DataNode
- chanRecoveredDataNodes chan *DataNode
- chanFullVolumes chan storage.VolumeInfo
-
- configuration *Configuration
-
- RaftServer raft.Server
-}
-
-func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
- t := &Topology{}
- t.id = NodeId(id)
- t.nodeType = "Topology"
- t.NodeImpl.value = t
- t.children = make(map[NodeId]Node)
- t.collectionMap = util.NewConcurrentReadMap()
- t.pulse = int64(pulse)
- t.volumeSizeLimit = volumeSizeLimit
-
- t.Sequence = seq
-
- t.chanDeadDataNodes = make(chan *DataNode)
- t.chanRecoveredDataNodes = make(chan *DataNode)
- t.chanFullVolumes = make(chan storage.VolumeInfo)
-
- err := t.loadConfiguration(confFile)
-
- return t, err
-}
-
-func (t *Topology) IsLeader() bool {
- if leader, e := t.Leader(); e == nil {
- return leader == t.RaftServer.Name()
- }
- return false
-}
-
-func (t *Topology) Leader() (string, error) {
- l := ""
- if t.RaftServer != nil {
- l = t.RaftServer.Leader()
- } else {
- return "", errors.New("Raft Server not ready yet!")
- }
-
- if l == "" {
- // We are a single node cluster, we are the leader
- return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
- }
-
- return l, nil
-}
-
-func (t *Topology) loadConfiguration(configurationFile string) error {
- b, e := ioutil.ReadFile(configurationFile)
- if e == nil {
- t.configuration, e = NewConfiguration(b)
- return e
- }
- glog.V(0).Infoln("Using default configurations.")
- return nil
-}
-
-func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
- //maybe an issue if lots of collections?
- if collection == "" {
- for _, c := range t.collectionMap.Items() {
- if list := c.(*Collection).Lookup(vid); list != nil {
- return list
- }
- }
- } else {
- if c, ok := t.collectionMap.Find(collection); ok {
- return c.(*Collection).Lookup(vid)
- }
- }
- return nil
-}
-
-func (t *Topology) NextVolumeId() storage.VolumeId {
- vid := t.GetMaxVolumeId()
- next := vid.Next()
- go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
- return next
-}
-
-func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
- vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
- return vl.GetActiveVolumeCount(option) > 0
-}
-
-func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
- if err != nil || datanodes.Length() == 0 {
- return "", 0, nil, errors.New("No writable volumes available!")
- }
- fileId, count := t.Sequence.NextFileId(count)
- return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
-}
-
-func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
- return t.collectionMap.Get(collectionName, func() interface{} {
- return NewCollection(collectionName, t.volumeSizeLimit)
- }).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
-}
-
-func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
- c, hasCollection := t.collectionMap.Find(collectionName)
- return c.(*Collection), hasCollection
-}
-
-func (t *Topology) DeleteCollection(collectionName string) {
- t.collectionMap.Delete(collectionName)
-}
-
-func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
-}
-func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- glog.Infof("removing volume info:%+v", v)
- t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
-}
-
-func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
- t.Sequence.SetMax(*joinMessage.MaxFileKey)
- dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
- rack := dc.GetOrCreateRack(rackName)
- dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
- if *joinMessage.IsInit && dn != nil {
- t.UnRegisterDataNode(dn)
- }
- dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
- int(*joinMessage.Port), *joinMessage.PublicUrl,
- int(*joinMessage.MaxVolumeCount))
- var volumeInfos []storage.VolumeInfo
- for _, v := range joinMessage.Volumes {
- if vi, err := storage.NewVolumeInfo(v); err == nil {
- volumeInfos = append(volumeInfos, vi)
- } else {
- glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
- }
- }
- deletedVolumes := dn.UpdateVolumes(volumeInfos)
- for _, v := range volumeInfos {
- t.RegisterVolumeLayout(v, dn)
- }
- for _, v := range deletedVolumes {
- t.UnRegisterVolumeLayout(v, dn)
- }
-}
-
-func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- if string(dc.Id()) == dcName {
- return dc
- }
- }
- dc := NewDataCenter(dcName)
- t.LinkChildNode(dc)
- return dc
-}
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
deleted file mode 100644
index 8a3cc5a89..000000000
--- a/go/topology/topology_event_handling.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package topology
-
-import (
- "math/rand"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
- go func() {
- for {
- if t.IsLeader() {
- freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
- t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
- }
- time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
- }
- }()
- go func(garbageThreshold string) {
- c := time.Tick(15 * time.Minute)
- for _ = range c {
- if t.IsLeader() {
- t.Vacuum(garbageThreshold)
- }
- }
- }(garbageThreshold)
- go func() {
- for {
- select {
- case v := <-t.chanFullVolumes:
- t.SetVolumeCapacityFull(v)
- case dn := <-t.chanRecoveredDataNodes:
- t.RegisterRecoveredDataNode(dn)
- glog.V(0).Infoln("DataNode", dn, "is back alive!")
- case dn := <-t.chanDeadDataNodes:
- t.UnRegisterDataNode(dn)
- glog.V(0).Infoln("DataNode", dn, "is dead!")
- }
- }
- }()
-}
-func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
- if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
- return false
- }
- for _, dn := range vl.vid2location[volumeInfo.Id].list {
- if !volumeInfo.ReadOnly {
- dn.UpAdjustActiveVolumeCountDelta(-1)
- }
- }
- return true
-}
-func (t *Topology) UnRegisterDataNode(dn *DataNode) {
- for _, v := range dn.GetVolumes() {
- glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
- vl.SetVolumeUnavailable(dn, v.Id)
- }
- dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
- dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
- dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
- dn.Parent().UnlinkChildNode(dn.Id())
-}
-func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
- for _, v := range dn.GetVolumes() {
- vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
- if vl.isWritable(&v) {
- vl.SetVolumeAvailable(dn, v.Id)
- }
- }
-}
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
deleted file mode 100644
index ce8e9e663..000000000
--- a/go/topology/topology_map.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package topology
-
-func (t *Topology) ToMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- var dcs []interface{}
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- dcs = append(dcs, dc.ToMap())
- }
- m["DataCenters"] = dcs
- var layouts []interface{}
- for _, col := range t.collectionMap.Items() {
- c := col.(*Collection)
- for _, layout := range c.storageType2VolumeLayout.Items() {
- if layout != nil {
- tmp := layout.(*VolumeLayout).ToMap()
- tmp["collection"] = c.Name
- layouts = append(layouts, tmp)
- }
- }
- }
- m["layouts"] = layouts
- return m
-}
-
-func (t *Topology) ToVolumeMap() interface{} {
- m := make(map[string]interface{})
- m["Max"] = t.GetMaxVolumeCount()
- m["Free"] = t.FreeSpace()
- dcs := make(map[NodeId]interface{})
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- racks := make(map[NodeId]interface{})
- for _, r := range dc.Children() {
- rack := r.(*Rack)
- dataNodes := make(map[NodeId]interface{})
- for _, d := range rack.Children() {
- dn := d.(*DataNode)
- var volumes []interface{}
- for _, v := range dn.GetVolumes() {
- volumes = append(volumes, v)
- }
- dataNodes[d.Id()] = volumes
- }
- racks[r.Id()] = dataNodes
- }
- dcs[dc.Id()] = racks
- }
- m["DataCenters"] = dcs
- return m
-}
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
deleted file mode 100644
index eeb4fef69..000000000
--- a/go/topology/topology_vacuum.go
+++ /dev/null
@@ -1,158 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "errors"
- "net/url"
- "time"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
- "github.com/chrislusf/seaweedfs/go/util"
-)
-
-func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
- if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
- //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
- ch <- ret
- }
- }(index, dn.Url(), vid)
- }
- isCheckSuccess := true
- for _ = range locationlist.list {
- select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
- case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
- }
- }
- return isCheckSuccess
-}
-func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- vl.removeFromWritable(vid)
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- if e := vacuumVolume_Compact(url, vid); e != nil {
- glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
- ch <- true
- }
- }(index, dn.Url(), vid)
- }
- isVacuumSuccess := true
- for _ = range locationlist.list {
- select {
- case _ = <-ch:
- case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
- }
- }
- return isVacuumSuccess
-}
-func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- isCommitSuccess := true
- for _, dn := range locationlist.list {
- glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
- if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
- glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
- isCommitSuccess = false
- } else {
- glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
- }
- if isCommitSuccess {
- vl.SetVolumeAvailable(dn, vid)
- }
- }
- return isCommitSuccess
-}
-func (t *Topology) Vacuum(garbageThreshold string) int {
- glog.V(0).Infoln("Start vacuum on demand")
- for _, col := range t.collectionMap.Items() {
- c := col.(*Collection)
- glog.V(0).Infoln("vacuum on collection:", c.Name)
- for _, vl := range c.storageType2VolumeLayout.Items() {
- if vl != nil {
- volumeLayout := vl.(*VolumeLayout)
- for vid, locationlist := range volumeLayout.vid2location {
- glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
- if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
- batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
- }
- }
- }
- }
- }
- }
- return 0
-}
-
-type VacuumVolumeResult struct {
- Result bool
- Error string
-}
-
-func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("garbageThreshold", garbageThreshold)
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
- if err != nil {
- glog.V(0).Infoln("parameters:", values)
- return err, false
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err, false
- }
- if ret.Error != "" {
- return errors.New(ret.Error), false
- }
- return nil, ret.Result
-}
-func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
-func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
deleted file mode 100644
index a25ba116b..000000000
--- a/go/topology/volume_growth.go
+++ /dev/null
@@ -1,211 +0,0 @@
-package topology
-
-import (
- "fmt"
- "math/rand"
- "sync"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-/*
-This package is created to resolve these replica placement issues:
-1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
-2. in time of tight storage, how to reduce replica level
-3. optimizing for hot data on faster disk, cold data on cheaper storage,
-4. volume allocation for each bucket
-*/
-
-type VolumeGrowOption struct {
- Collection string
- ReplicaPlacement *storage.ReplicaPlacement
- Ttl *storage.TTL
- DataCenter string
- Rack string
- DataNode string
-}
-
-type VolumeGrowth struct {
- accessLock sync.Mutex
-}
-
-func (o *VolumeGrowOption) String() string {
- return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode)
-}
-
-func NewDefaultVolumeGrowth() *VolumeGrowth {
- return &VolumeGrowth{}
-}
-
-// one replication type may need rp.GetCopyCount() actual volumes
-// given copyCount, how many logical volumes to create
-func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
- switch copyCount {
- case 1:
- count = 7
- case 2:
- count = 6
- case 3:
- count = 3
- default:
- count = 1
- }
- return
-}
-
-func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
- if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
- return count, nil
- }
- return count, err
-}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
- vg.accessLock.Lock()
- defer vg.accessLock.Unlock()
-
- for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, option); e == nil {
- counter += c
- } else {
- return counter, e
- }
- }
- return
-}
-
-func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
- servers, e := vg.findEmptySlotsForOneVolume(topo, option)
- if e != nil {
- return 0, e
- }
- vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, option, servers...)
- return len(servers), err
-}
-
-// 1. find the main data node
-// 1.1 collect all data nodes that have 1 slots
-// 2.2 collect all racks that have rp.SameRackCount+1
-// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
-// 2. find rest data nodes
-func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
- //find main datacenter and other data centers
- rp := option.ReplicaPlacement
- mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
- if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
- return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
- }
- if len(node.Children()) < rp.DiffRackCount+1 {
- return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
- }
- if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
- }
- possibleRacksCount := 0
- for _, rack := range node.Children() {
- possibleDataNodesCount := 0
- for _, n := range rack.Children() {
- if n.FreeSpace() >= 1 {
- possibleDataNodesCount++
- }
- }
- if possibleDataNodesCount >= rp.SameRackCount+1 {
- possibleRacksCount++
- }
- }
- if possibleRacksCount < rp.DiffRackCount+1 {
- return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
- }
- return nil
- })
- if dc_err != nil {
- return nil, dc_err
- }
-
- //find main rack and other racks
- mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
- if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
- return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
- }
- if node.FreeSpace() < rp.SameRackCount+1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
- }
- if len(node.Children()) < rp.SameRackCount+1 {
- // a bit faster way to test free racks
- return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
- }
- possibleDataNodesCount := 0
- for _, n := range node.Children() {
- if n.FreeSpace() >= 1 {
- possibleDataNodesCount++
- }
- }
- if possibleDataNodesCount < rp.SameRackCount+1 {
- return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
- }
- return nil
- })
- if rack_err != nil {
- return nil, rack_err
- }
-
- //find main rack and other racks
- mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
- if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
- return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
- }
- if node.FreeSpace() < 1 {
- return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
- }
- return nil
- })
- if server_err != nil {
- return nil, server_err
- }
-
- servers = append(servers, mainServer.(*DataNode))
- for _, server := range otherServers {
- servers = append(servers, server.(*DataNode))
- }
- for _, rack := range otherRacks {
- r := rand.Intn(rack.FreeSpace())
- if server, e := rack.ReserveOneVolume(r); e == nil {
- servers = append(servers, server)
- } else {
- return servers, e
- }
- }
- for _, datacenter := range otherDataCenters {
- r := rand.Intn(datacenter.FreeSpace())
- if server, e := datacenter.ReserveOneVolume(r); e == nil {
- servers = append(servers, server)
- } else {
- return servers, e
- }
- }
- return
-}
-
-func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
- for _, server := range servers {
- if err := AllocateVolume(server, vid, option); err == nil {
- vi := storage.VolumeInfo{
- Id: vid,
- Size: 0,
- Collection: option.Collection,
- ReplicaPlacement: option.ReplicaPlacement,
- Ttl: option.Ttl,
- Version: storage.CurrentVersion,
- }
- server.AddOrUpdateVolume(vi)
- topo.RegisterVolumeLayout(vi, server)
- glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
- } else {
- glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
- return fmt.Errorf("Failed to assign %d: %v", vid, err)
- }
- }
- return nil
-}
diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go
deleted file mode 100644
index 15abfcc73..000000000
--- a/go/topology/volume_growth_test.go
+++ /dev/null
@@ -1,135 +0,0 @@
-package topology
-
-import (
- "encoding/json"
- "fmt"
- "testing"
-
- "github.com/chrislusf/seaweedfs/go/sequence"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-var topologyLayout = `
-{
- "dc1":{
- "rack1":{
- "server111":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server112":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server121":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server122":{
- "volumes":[],
- "limit":4
- },
- "server123":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":5
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server321":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
-}
-`
-
-func setup(topologyLayout string) *Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
- fmt.Println("data:", data)
-
- //need to connect all nodes first before server adding volumes
- topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
- sequence.NewMemorySequencer(), 32*1024, 5)
- if err != nil {
- panic("error: " + err.Error())
- }
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- rack := NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(rack)
- for serverKey, serverValue := range rackMap {
- server := NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- rack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{
- Id: storage.VolumeId(int64(m["id"].(float64))),
- Size: uint64(m["size"].(float64)),
- Version: storage.CurrentVersion}
- server.AddOrUpdateVolume(vi)
- }
- server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
- }
- }
- }
-
- return topo
-}
-
-func TestFindEmptySlotsForOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- vg := NewDefaultVolumeGrowth()
- rp, _ := storage.NewReplicaPlacementFromString("002")
- volumeGrowOption := &VolumeGrowOption{
- Collection: "",
- ReplicaPlacement: rp,
- DataCenter: "dc1",
- Rack: "",
- DataNode: "",
- }
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
- if err != nil {
- fmt.Println("finding empty slots error :", err)
- t.Fail()
- }
- for _, server := range servers {
- fmt.Println("assigned node :", server.Id())
- }
-}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
deleted file mode 100644
index 7b6c0f117..000000000
--- a/go/topology/volume_layout.go
+++ /dev/null
@@ -1,226 +0,0 @@
-package topology
-
-import (
- "errors"
- "fmt"
- "math/rand"
- "sync"
-
- "github.com/chrislusf/seaweedfs/go/glog"
- "github.com/chrislusf/seaweedfs/go/storage"
-)
-
-// mapping from volume to its locations, inverted from server to volume
-type VolumeLayout struct {
- rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- volumeSizeLimit uint64
- accessLock sync.RWMutex
-}
-
-func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
- return &VolumeLayout{
- rp: rp,
- ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- volumeSizeLimit: volumeSizeLimit,
- }
-}
-
-func (vl *VolumeLayout) String() string {
- return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
-}
-
-func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
- vl.accessLock.Lock()
- defer vl.accessLock.Unlock()
-
- if _, ok := vl.vid2location[v.Id]; !ok {
- vl.vid2location[v.Id] = NewVolumeLocationList()
- }
- vl.vid2location[v.Id].Set(dn)
- glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- vl.addToWritable(v.Id)
- } else {
- vl.removeFromWritable(v.Id)
- }
-}
-
-func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
- vl.accessLock.Lock()
- defer vl.accessLock.Unlock()
-
- vl.removeFromWritable(v.Id)
- delete(vl.vid2location, v.Id)
-}
-
-func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
- for _, id := range vl.writables {
- if vid == id {
- return
- }
- }
- vl.writables = append(vl.writables, vid)
-}
-
-func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
- return uint64(v.Size) < vl.volumeSizeLimit &&
- v.Version == storage.CurrentVersion &&
- !v.ReadOnly
-}
-
-func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
- vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
-
- if location := vl.vid2location[vid]; location != nil {
- return location.list
- }
- return nil
-}
-
-func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
- vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
-
- for _, location := range vl.vid2location {
- nodes = append(nodes, location.list...)
- }
- return
-}
-
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
- vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
-
- len_writers := len(vl.writables)
- if len_writers <= 0 {
- glog.V(0).Infoln("No more writable volumes!")
- return nil, 0, nil, errors.New("No more writable volumes!")
- }
- if option.DataCenter == "" {
- vid := vl.writables[rand.Intn(len_writers)]
- locationList := vl.vid2location[vid]
- if locationList != nil {
- return &vid, count, locationList, nil
- }
- return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
- }
- var vid storage.VolumeId
- var locationList *VolumeLocationList
- counter := 0
- for _, v := range vl.writables {
- volumeLocationList := vl.vid2location[v]
- for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
- if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
- continue
- }
- if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
- continue
- }
- counter++
- if rand.Intn(counter) < 1 {
- vid, locationList = v, volumeLocationList
- }
- }
- }
- }
- return &vid, count, locationList, nil
-}
-
-func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
- vl.accessLock.RLock()
- defer vl.accessLock.RUnlock()
-
- if option.DataCenter == "" {
- return len(vl.writables)
- }
- counter := 0
- for _, v := range vl.writables {
- for _, dn := range vl.vid2location[v].list {
- if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
- if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
- continue
- }
- if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
- continue
- }
- counter++
- }
- }
- }
- return counter
-}
-
-func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
- toDeleteIndex := -1
- for k, id := range vl.writables {
- if id == vid {
- toDeleteIndex = k
- break
- }
- }
- if toDeleteIndex >= 0 {
- glog.V(0).Infoln("Volume", vid, "becomes unwritable")
- vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...)
- return true
- }
- return false
-}
-func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
- for _, v := range vl.writables {
- if v == vid {
- return false
- }
- }
- glog.V(0).Infoln("Volume", vid, "becomes writable")
- vl.writables = append(vl.writables, vid)
- return true
-}
-
-func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
- vl.accessLock.Lock()
- defer vl.accessLock.Unlock()
-
- if location, ok := vl.vid2location[vid]; ok {
- if location.Remove(dn) {
- if location.Length() < vl.rp.GetCopyCount() {
- glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
- return vl.removeFromWritable(vid)
- }
- }
- }
- return false
-}
-func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
- vl.accessLock.Lock()
- defer vl.accessLock.Unlock()
-
- vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
- return vl.setVolumeWritable(vid)
- }
- return false
-}
-
-func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
- vl.accessLock.Lock()
- defer vl.accessLock.Unlock()
-
- // glog.V(0).Infoln("Volume", vid, "reaches full capacity.")
- return vl.removeFromWritable(vid)
-}
-
-func (vl *VolumeLayout) ToMap() map[string]interface{} {
- m := make(map[string]interface{})
- m["replication"] = vl.rp.String()
- m["ttl"] = vl.ttl.String()
- m["writables"] = vl.writables
- //m["locations"] = vl.vid2location
- return m
-}
diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go
deleted file mode 100644
index d5eaf5e92..000000000
--- a/go/topology/volume_location_list.go
+++ /dev/null
@@ -1,65 +0,0 @@
-package topology
-
-import (
- "fmt"
-)
-
-type VolumeLocationList struct {
- list []*DataNode
-}
-
-func NewVolumeLocationList() *VolumeLocationList {
- return &VolumeLocationList{}
-}
-
-func (dnll *VolumeLocationList) String() string {
- return fmt.Sprintf("%v", dnll.list)
-}
-
-func (dnll *VolumeLocationList) Head() *DataNode {
- //mark first node as master volume
- return dnll.list[0]
-}
-
-func (dnll *VolumeLocationList) Length() int {
- return len(dnll.list)
-}
-
-func (dnll *VolumeLocationList) Set(loc *DataNode) {
- for i := 0; i < len(dnll.list); i++ {
- if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
- dnll.list[i] = loc
- return
- }
- }
- dnll.list = append(dnll.list, loc)
-}
-
-func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
- for i, dnl := range dnll.list {
- if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
- return true
- }
- }
- return false
-}
-
-func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
- var changed bool
- for _, dnl := range dnll.list {
- if dnl.LastSeen < freshThreshHold {
- changed = true
- break
- }
- }
- if changed {
- var l []*DataNode
- for _, dnl := range dnll.list {
- if dnl.LastSeen >= freshThreshHold {
- l = append(l, dnl)
- }
- }
- dnll.list = l
- }
-}