diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/allocate_volume.go | 35 | ||||
| -rw-r--r-- | go/topology/cluster_commands.go | 31 | ||||
| -rw-r--r-- | go/topology/collection.go | 57 | ||||
| -rw-r--r-- | go/topology/configuration.go | 65 | ||||
| -rw-r--r-- | go/topology/configuration_test.go | 42 | ||||
| -rw-r--r-- | go/topology/data_center.go | 40 | ||||
| -rw-r--r-- | go/topology/data_node.go | 115 | ||||
| -rw-r--r-- | go/topology/node.go | 272 | ||||
| -rw-r--r-- | go/topology/rack.go | 65 | ||||
| -rw-r--r-- | go/topology/store_replicate.go | 150 | ||||
| -rw-r--r-- | go/topology/topo_test.go | 17 | ||||
| -rw-r--r-- | go/topology/topology.go | 189 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 74 | ||||
| -rw-r--r-- | go/topology/topology_map.go | 53 | ||||
| -rw-r--r-- | go/topology/topology_vacuum.go | 158 | ||||
| -rw-r--r-- | go/topology/volume_growth.go | 211 | ||||
| -rw-r--r-- | go/topology/volume_growth_test.go | 135 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 226 | ||||
| -rw-r--r-- | go/topology/volume_location_list.go | 65 |
19 files changed, 0 insertions, 2000 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go deleted file mode 100644 index f014c3527..000000000 --- a/go/topology/allocate_volume.go +++ /dev/null @@ -1,35 +0,0 @@ -package topology - -import ( - "encoding/json" - "errors" - "fmt" - "net/url" - - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -type AllocateVolumeResult struct { - Error string -} - -func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("collection", option.Collection) - values.Add("replication", option.ReplicaPlacement.String()) - values.Add("ttl", option.Ttl.String()) - jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) - if err != nil { - return err - } - var ret AllocateVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob)) - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/go/topology/cluster_commands.go b/go/topology/cluster_commands.go deleted file mode 100644 index eac93c13c..000000000 --- a/go/topology/cluster_commands.go +++ /dev/null @@ -1,31 +0,0 @@ -package topology - -import ( - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -type MaxVolumeIdCommand struct { - MaxVolumeId storage.VolumeId `json:"maxVolumeId"` -} - -func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { - return &MaxVolumeIdCommand{ - MaxVolumeId: value, - } -} - -func (c *MaxVolumeIdCommand) CommandName() string { - return "MaxVolumeId" -} - -func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { - topo := server.Context().(*Topology) - before := topo.GetMaxVolumeId() - topo.UpAdjustMaxVolumeId(c.MaxVolumeId) - - glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) - - return nil, nil -} diff --git a/go/topology/collection.go b/go/topology/collection.go deleted file mode 100644 index 6368900c3..000000000 --- a/go/topology/collection.go +++ /dev/null @@ -1,57 +0,0 @@ -package topology - -import ( - "fmt" - - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -type Collection struct { - Name string - volumeSizeLimit uint64 - storageType2VolumeLayout *util.ConcurrentReadMap -} - -func NewCollection(name string, volumeSizeLimit uint64) *Collection { - c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.storageType2VolumeLayout = util.NewConcurrentReadMap() - return c -} - -func (c *Collection) String() string { - return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) -} - -func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - keyString := rp.String() - if ttl != nil { - keyString += ttl.String() - } - vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { - return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) - }) - return vl.(*VolumeLayout) -} - -func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.storageType2VolumeLayout.Items() { - if vl != nil { - if list := vl.(*VolumeLayout).Lookup(vid); list != nil { - return list - } - } - } - return nil -} - -func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout.Items() { - if vl != nil { - if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { - nodes = append(nodes, list...) - } - } - } - return -} diff --git a/go/topology/configuration.go b/go/topology/configuration.go deleted file mode 100644 index ffcebb59c..000000000 --- a/go/topology/configuration.go +++ /dev/null @@ -1,65 +0,0 @@ -package topology - -import ( - "encoding/xml" -) - -type loc struct { - dcName string - rackName string -} -type rack struct { - Name string `xml:"name,attr"` - Ips []string `xml:"Ip"` -} -type dataCenter struct { - Name string `xml:"name,attr"` - Racks []rack `xml:"Rack"` -} -type topology struct { - DataCenters []dataCenter `xml:"DataCenter"` -} -type Configuration struct { - XMLName xml.Name `xml:"Configuration"` - Topo topology `xml:"Topology"` - ip2location map[string]loc -} - -func NewConfiguration(b []byte) (*Configuration, error) { - c := &Configuration{} - err := xml.Unmarshal(b, c) - c.ip2location = make(map[string]loc) - for _, dc := range c.Topo.DataCenters { - for _, rack := range dc.Racks { - for _, ip := range rack.Ips { - c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} - } - } - } - return c, err -} - -func (c *Configuration) String() string { - if b, e := xml.MarshalIndent(c, " ", " "); e == nil { - return string(b) - } - return "" -} - -func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { - if c != nil && c.ip2location != nil { - if loc, ok := c.ip2location[ip]; ok { - return loc.dcName, loc.rackName - } - } - - if dcName == "" { - dcName = "DefaultDataCenter" - } - - if rackName == "" { - rackName = "DefaultRack" - } - - return dcName, rackName -} diff --git a/go/topology/configuration_test.go b/go/topology/configuration_test.go deleted file mode 100644 index 0a353d16e..000000000 --- a/go/topology/configuration_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package topology - -import ( - "fmt" - "testing" -) - -func TestLoadConfiguration(t *testing.T) { - - confContent := ` - -<?xml version="1.0" encoding="UTF-8" ?> -<Configuration> - <Topology> - <DataCenter name="dc1"> - <Rack name="rack1"> - <Ip>192.168.1.1</Ip> - </Rack> - </DataCenter> - <DataCenter name="dc2"> - <Rack name="rack1"> - <Ip>192.168.1.2</Ip> - </Rack> - <Rack name="rack2"> - <Ip>192.168.1.3</Ip> - <Ip>192.168.1.4</Ip> - </Rack> - </DataCenter> - </Topology> -</Configuration> -` - c, err := NewConfiguration([]byte(confContent)) - - fmt.Printf("%s\n", c) - if err != nil { - t.Fatalf("unmarshal error:%v", err) - } - - if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { - t.Fatalf("unmarshal error:%s", c) - } -} diff --git a/go/topology/data_center.go b/go/topology/data_center.go deleted file mode 100644 index bcf2dfd31..000000000 --- a/go/topology/data_center.go +++ /dev/null @@ -1,40 +0,0 @@ -package topology - -type DataCenter struct { - NodeImpl -} - -func NewDataCenter(id string) *DataCenter { - dc := &DataCenter{} - dc.id = NodeId(id) - dc.nodeType = "DataCenter" - dc.children = make(map[NodeId]Node) - dc.NodeImpl.value = dc - return dc -} - -func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { - for _, c := range dc.Children() { - rack := c.(*Rack) - if string(rack.Id()) == rackName { - return rack - } - } - rack := NewRack(rackName) - dc.LinkChildNode(rack) - return rack -} - -func (dc *DataCenter) ToMap() interface{} { - m := make(map[string]interface{}) - m["Id"] = dc.Id() - m["Max"] = dc.GetMaxVolumeCount() - m["Free"] = dc.FreeSpace() - var racks []interface{} - for _, c := range dc.Children() { - rack := c.(*Rack) - racks = append(racks, rack.ToMap()) - } - m["Racks"] = racks - return m -} diff --git a/go/topology/data_node.go b/go/topology/data_node.go deleted file mode 100644 index 3bad8c188..000000000 --- a/go/topology/data_node.go +++ /dev/null @@ -1,115 +0,0 @@ -package topology - -import ( - "fmt" - "strconv" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -type DataNode struct { - NodeImpl - volumes map[storage.VolumeId]storage.VolumeInfo - Ip string - Port int - PublicUrl string - LastSeen int64 // unix time in seconds - Dead bool -} - -func NewDataNode(id string) *DataNode { - s := &DataNode{} - s.id = NodeId(id) - s.nodeType = "DataNode" - s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) - s.NodeImpl.value = s - return s -} - -func (dn *DataNode) String() string { - dn.RLock() - defer dn.RUnlock() - return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) -} - -func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { - dn.Lock() - defer dn.Unlock() - if _, ok := dn.volumes[v.Id]; !ok { - dn.volumes[v.Id] = v - dn.UpAdjustVolumeCountDelta(1) - if !v.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(1) - } - dn.UpAdjustMaxVolumeId(v.Id) - } else { - dn.volumes[v.Id] = v - } -} - -func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { - actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) - for _, v := range actualVolumes { - actualVolumeMap[v.Id] = v - } - dn.RLock() - for vid, v := range dn.volumes { - if _, ok := actualVolumeMap[vid]; !ok { - glog.V(0).Infoln("Deleting volume id:", vid) - delete(dn.volumes, vid) - deletedVolumes = append(deletedVolumes, v) - dn.UpAdjustVolumeCountDelta(-1) - dn.UpAdjustActiveVolumeCountDelta(-1) - } - } //TODO: adjust max volume id, if need to reclaim volume ids - dn.RUnlock() - for _, v := range actualVolumes { - dn.AddOrUpdateVolume(v) - } - return -} - -func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { - dn.RLock() - for _, v := range dn.volumes { - ret = append(ret, v) - } - dn.RUnlock() - return ret -} - -func (dn *DataNode) GetDataCenter() *DataCenter { - return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) -} - -func (dn *DataNode) GetRack() *Rack { - return dn.Parent().(*NodeImpl).value.(*Rack) -} - -func (dn *DataNode) GetTopology() *Topology { - p := dn.Parent() - for p.Parent() != nil { - p = p.Parent() - } - t := p.(*Topology) - return t -} - -func (dn *DataNode) MatchLocation(ip string, port int) bool { - return dn.Ip == ip && dn.Port == port -} - -func (dn *DataNode) Url() string { - return dn.Ip + ":" + strconv.Itoa(dn.Port) -} - -func (dn *DataNode) ToMap() interface{} { - ret := make(map[string]interface{}) - ret["Url"] = dn.Url() - ret["Volumes"] = dn.GetVolumeCount() - ret["Max"] = dn.GetMaxVolumeCount() - ret["Free"] = dn.FreeSpace() - ret["PublicUrl"] = dn.PublicUrl - return ret -} diff --git a/go/topology/node.go b/go/topology/node.go deleted file mode 100644 index 6a84b4b92..000000000 --- a/go/topology/node.go +++ /dev/null @@ -1,272 +0,0 @@ -package topology - -import ( - "errors" - "math/rand" - "strings" - "sync" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -type NodeId string -type Node interface { - Id() NodeId - String() string - FreeSpace() int - ReserveOneVolume(r int) (*DataNode, error) - UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) - UpAdjustVolumeCountDelta(volumeCountDelta int) - UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) - UpAdjustMaxVolumeId(vid storage.VolumeId) - - GetVolumeCount() int - GetActiveVolumeCount() int - GetMaxVolumeCount() int - GetMaxVolumeId() storage.VolumeId - SetParent(Node) - LinkChildNode(node Node) - UnlinkChildNode(nodeId NodeId) - CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) - - IsDataNode() bool - IsRack() bool - IsDataCenter() bool - Children() []Node - Parent() Node - - GetValue() interface{} //get reference to the topology,dc,rack,datanode -} -type NodeImpl struct { - id NodeId - volumeCount int - activeVolumeCount int - maxVolumeCount int - parent Node - sync.RWMutex // lock children - children map[NodeId]Node - maxVolumeId storage.VolumeId - - //for rack, data center, topology - nodeType string - value interface{} -} - -// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot -func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { - candidates := make([]Node, 0, len(n.children)) - var errs []string - n.RLock() - for _, node := range n.children { - if err := filterFirstNodeFn(node); err == nil { - candidates = append(candidates, node) - } else { - errs = append(errs, string(node.Id())+":"+err.Error()) - } - } - n.RUnlock() - if len(candidates) == 0 { - return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) - } - firstNode = candidates[rand.Intn(len(candidates))] - glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) - - restNodes = make([]Node, numberOfNodes-1) - candidates = candidates[:0] - n.RLock() - for _, node := range n.children { - if node.Id() == firstNode.Id() { - continue - } - if node.FreeSpace() <= 0 { - continue - } - glog.V(2).Infoln("select rest node candidate:", node.Id()) - candidates = append(candidates, node) - } - n.RUnlock() - glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") - ret := len(restNodes) == 0 - for k, node := range candidates { - if k < len(restNodes) { - restNodes[k] = node - if k == len(restNodes)-1 { - ret = true - } - } else { - r := rand.Intn(k + 1) - if r < len(restNodes) { - restNodes[r] = node - } - } - } - if !ret { - glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") - err = errors.New("Not enough data node found!") - } - return -} - -func (n *NodeImpl) IsDataNode() bool { - return n.nodeType == "DataNode" -} -func (n *NodeImpl) IsRack() bool { - return n.nodeType == "Rack" -} -func (n *NodeImpl) IsDataCenter() bool { - return n.nodeType == "DataCenter" -} -func (n *NodeImpl) String() string { - if n.parent != nil { - return n.parent.String() + ":" + string(n.id) - } - return string(n.id) -} -func (n *NodeImpl) Id() NodeId { - return n.id -} -func (n *NodeImpl) FreeSpace() int { - return n.maxVolumeCount - n.volumeCount -} -func (n *NodeImpl) SetParent(node Node) { - n.parent = node -} -func (n *NodeImpl) Children() (ret []Node) { - n.RLock() - defer n.RUnlock() - for _, c := range n.children { - ret = append(ret, c) - } - return ret -} -func (n *NodeImpl) Parent() Node { - return n.parent -} -func (n *NodeImpl) GetValue() interface{} { - return n.value -} -func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { - n.RLock() - defer n.RUnlock() - for _, node := range n.children { - freeSpace := node.FreeSpace() - // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) - if freeSpace <= 0 { - continue - } - if r >= freeSpace { - r -= freeSpace - } else { - if node.IsDataNode() && node.FreeSpace() > 0 { - // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) - return node.(*DataNode), nil - } - assignedNode, err = node.ReserveOneVolume(r) - if err != nil { - return - } - } - } - return -} - -func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative - n.maxVolumeCount += maxVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative - n.volumeCount += volumeCountDelta - if n.parent != nil { - n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative - n.activeVolumeCount += activeVolumeCountDelta - if n.parent != nil { - n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) - } -} -func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative - if n.maxVolumeId < vid { - n.maxVolumeId = vid - if n.parent != nil { - n.parent.UpAdjustMaxVolumeId(vid) - } - } -} -func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { - return n.maxVolumeId -} -func (n *NodeImpl) GetVolumeCount() int { - return n.volumeCount -} -func (n *NodeImpl) GetActiveVolumeCount() int { - return n.activeVolumeCount -} -func (n *NodeImpl) GetMaxVolumeCount() int { - return n.maxVolumeCount -} - -func (n *NodeImpl) LinkChildNode(node Node) { - n.Lock() - defer n.Unlock() - if n.children[node.Id()] == nil { - n.children[node.Id()] = node - n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) - n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) - n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) - node.SetParent(n) - glog.V(0).Infoln(n, "adds child", node.Id()) - } -} - -func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { - n.Lock() - defer n.Unlock() - node := n.children[nodeId] - if node != nil { - node.SetParent(nil) - delete(n.children, node.Id()) - n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) - n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) - n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) - glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount) - } -} - -func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { - if n.IsRack() { - for _, c := range n.Children() { - dn := c.(*DataNode) //can not cast n to DataNode - if dn.LastSeen < freshThreshHold { - if !dn.Dead { - dn.Dead = true - n.GetTopology().chanDeadDataNodes <- dn - } - } - for _, v := range dn.GetVolumes() { - if uint64(v.Size) >= volumeSizeLimit { - //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- v - } - } - } - } else { - for _, c := range n.Children() { - c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) - } - } -} - -func (n *NodeImpl) GetTopology() *Topology { - var p Node - p = n - for p.Parent() != nil { - p = p.Parent() - } - return p.GetValue().(*Topology) -} diff --git a/go/topology/rack.go b/go/topology/rack.go deleted file mode 100644 index 1ca2f8de8..000000000 --- a/go/topology/rack.go +++ /dev/null @@ -1,65 +0,0 @@ -package topology - -import ( - "strconv" - "time" -) - -type Rack struct { - NodeImpl -} - -func NewRack(id string) *Rack { - r := &Rack{} - r.id = NodeId(id) - r.nodeType = "Rack" - r.children = make(map[NodeId]Node) - r.NodeImpl.value = r - return r -} - -func (r *Rack) FindDataNode(ip string, port int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - return dn - } - } - return nil -} -func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { - for _, c := range r.Children() { - dn := c.(*DataNode) - if dn.MatchLocation(ip, port) { - dn.LastSeen = time.Now().Unix() - if dn.Dead { - dn.Dead = false - r.GetTopology().chanRecoveredDataNodes <- dn - dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) - } - return dn - } - } - dn := NewDataNode(ip + ":" + strconv.Itoa(port)) - dn.Ip = ip - dn.Port = port - dn.PublicUrl = publicUrl - dn.maxVolumeCount = maxVolumeCount - dn.LastSeen = time.Now().Unix() - r.LinkChildNode(dn) - return dn -} - -func (r *Rack) ToMap() interface{} { - m := make(map[string]interface{}) - m["Id"] = r.Id() - m["Max"] = r.GetMaxVolumeCount() - m["Free"] = r.FreeSpace() - var dns []interface{} - for _, c := range r.Children() { - dn := c.(*DataNode) - dns = append(dns, dn.ToMap()) - } - m["DataNodes"] = dns - return m -} diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go deleted file mode 100644 index f67ea7732..000000000 --- a/go/topology/store_replicate.go +++ /dev/null @@ -1,150 +0,0 @@ -package topology - -import ( - "bytes" - "errors" - "fmt" - "net/http" - "strconv" - "strings" - - "net/url" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/security" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -func ReplicatedWrite(masterNode string, s *storage.Store, - volumeId storage.VolumeId, needle *storage.Needle, - r *http.Request) (size uint32, errorStatus string) { - - //check JWT - jwt := security.GetJwt(r) - - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - - if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { - u := url.URL{ - Scheme: "http", - Host: location.Url, - Path: r.URL.Path, - } - q := url.Values{ - "type": {"replicate"}, - } - if needle.LastModified > 0 { - q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) - } - if needle.IsChunkedManifest() { - q.Set("cm", "true") - } - u.RawQuery = q.Encode() - _, err := operation.Upload(u.String(), - string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), - jwt) - return err - }); err != nil { - ret = 0 - errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) - } - } - } - size = ret - return -} - -func ReplicatedDelete(masterNode string, store *storage.Store, - volumeId storage.VolumeId, n *storage.Needle, - r *http.Request) (uint32, error) { - - //check JWT - jwt := security.GetJwt(r) - - ret, err := store.Delete(volumeId, n) - if err != nil { - glog.V(0).Infoln("delete error:", err) - return ret, err - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { - return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) - }); err != nil { - ret = 0 - } - } - } - return ret, err -} - -type DistributedOperationResult map[string]error - -func (dr DistributedOperationResult) Error() error { - var errs []string - for k, v := range dr { - if v != nil { - errs = append(errs, fmt.Sprintf("[%s]: %v", k, v)) - } - } - if len(errs) == 0 { - return nil - } - return errors.New(strings.Join(errs, "\n")) -} - -type RemoteResult struct { - Host string - Error error -} - -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - length := 0 - selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) - results := make(chan RemoteResult) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan RemoteResult) { - results <- RemoteResult{location.Url, op(location)} - }(location, results) - } - } - ret := DistributedOperationResult(make(map[string]error)) - for i := 0; i < length; i++ { - result := <-results - ret[result.Host] = result.Error - } - if volume := store.GetVolume(volumeId); volume != nil { - if length+1 < volume.ReplicaPlacement.GetCopyCount() { - return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) - } - } - return ret.Error() - } else { - glog.V(0).Infoln() - return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr) - } - return nil -} diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go deleted file mode 100644 index 9a0dbc6b8..000000000 --- a/go/topology/topo_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package topology - -import ( - "testing" -) - -func TestRemoveDataCenter(t *testing.T) { - topo := setup(topologyLayout) - topo.UnlinkChildNode(NodeId("dc2")) - if topo.GetActiveVolumeCount() != 15 { - t.Fail() - } - topo.UnlinkChildNode(NodeId("dc3")) - if topo.GetActiveVolumeCount() != 12 { - t.Fail() - } -} diff --git a/go/topology/topology.go b/go/topology/topology.go deleted file mode 100644 index 088639eef..000000000 --- a/go/topology/topology.go +++ /dev/null @@ -1,189 +0,0 @@ -package topology - -import ( - "errors" - "io/ioutil" - "math/rand" - - "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/operation" - "github.com/chrislusf/seaweedfs/go/sequence" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -type Topology struct { - NodeImpl - - collectionMap *util.ConcurrentReadMap - - pulse int64 - - volumeSizeLimit uint64 - - Sequence sequence.Sequencer - - chanDeadDataNodes chan *DataNode - chanRecoveredDataNodes chan *DataNode - chanFullVolumes chan storage.VolumeInfo - - configuration *Configuration - - RaftServer raft.Server -} - -func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { - t := &Topology{} - t.id = NodeId(id) - t.nodeType = "Topology" - t.NodeImpl.value = t - t.children = make(map[NodeId]Node) - t.collectionMap = util.NewConcurrentReadMap() - t.pulse = int64(pulse) - t.volumeSizeLimit = volumeSizeLimit - - t.Sequence = seq - - t.chanDeadDataNodes = make(chan *DataNode) - t.chanRecoveredDataNodes = make(chan *DataNode) - t.chanFullVolumes = make(chan storage.VolumeInfo) - - err := t.loadConfiguration(confFile) - - return t, err -} - -func (t *Topology) IsLeader() bool { - if leader, e := t.Leader(); e == nil { - return leader == t.RaftServer.Name() - } - return false -} - -func (t *Topology) Leader() (string, error) { - l := "" - if t.RaftServer != nil { - l = t.RaftServer.Leader() - } else { - return "", errors.New("Raft Server not ready yet!") - } - - if l == "" { - // We are a single node cluster, we are the leader - return t.RaftServer.Name(), errors.New("Raft Server not initialized!") - } - - return l, nil -} - -func (t *Topology) loadConfiguration(configurationFile string) error { - b, e := ioutil.ReadFile(configurationFile) - if e == nil { - t.configuration, e = NewConfiguration(b) - return e - } - glog.V(0).Infoln("Using default configurations.") - return nil -} - -func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { - //maybe an issue if lots of collections? - if collection == "" { - for _, c := range t.collectionMap.Items() { - if list := c.(*Collection).Lookup(vid); list != nil { - return list - } - } - } else { - if c, ok := t.collectionMap.Find(collection); ok { - return c.(*Collection).Lookup(vid) - } - } - return nil -} - -func (t *Topology) NextVolumeId() storage.VolumeId { - vid := t.GetMaxVolumeId() - next := vid.Next() - go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) - return next -} - -func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { - vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) - return vl.GetActiveVolumeCount(option) > 0 -} - -func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { - vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) - if err != nil || datanodes.Length() == 0 { - return "", 0, nil, errors.New("No writable volumes available!") - } - fileId, count := t.Sequence.NextFileId(count) - return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil -} - -func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - return t.collectionMap.Get(collectionName, func() interface{} { - return NewCollection(collectionName, t.volumeSizeLimit) - }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) -} - -func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { - c, hasCollection := t.collectionMap.Find(collectionName) - return c.(*Collection), hasCollection -} - -func (t *Topology) DeleteCollection(collectionName string) { - t.collectionMap.Delete(collectionName) -} - -func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) -} -func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info:%+v", v) - t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) -} - -func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { - t.Sequence.SetMax(*joinMessage.MaxFileKey) - dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) - dc := t.GetOrCreateDataCenter(dcName) - rack := dc.GetOrCreateRack(rackName) - dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) - if *joinMessage.IsInit && dn != nil { - t.UnRegisterDataNode(dn) - } - dn = rack.GetOrCreateDataNode(*joinMessage.Ip, - int(*joinMessage.Port), *joinMessage.PublicUrl, - int(*joinMessage.MaxVolumeCount)) - var volumeInfos []storage.VolumeInfo - for _, v := range joinMessage.Volumes { - if vi, err := storage.NewVolumeInfo(v); err == nil { - volumeInfos = append(volumeInfos, vi) - } else { - glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) - } - } - deletedVolumes := dn.UpdateVolumes(volumeInfos) - for _, v := range volumeInfos { - t.RegisterVolumeLayout(v, dn) - } - for _, v := range deletedVolumes { - t.UnRegisterVolumeLayout(v, dn) - } -} - -func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { - for _, c := range t.Children() { - dc := c.(*DataCenter) - if string(dc.Id()) == dcName { - return dc - } - } - dc := NewDataCenter(dcName) - t.LinkChildNode(dc) - return dc -} diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go deleted file mode 100644 index 8a3cc5a89..000000000 --- a/go/topology/topology_event_handling.go +++ /dev/null @@ -1,74 +0,0 @@ -package topology - -import ( - "math/rand" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { - go func() { - for { - if t.IsLeader() { - freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval - t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) - } - time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - go func(garbageThreshold string) { - c := time.Tick(15 * time.Minute) - for _ = range c { - if t.IsLeader() { - t.Vacuum(garbageThreshold) - } - } - }(garbageThreshold) - go func() { - for { - select { - case v := <-t.chanFullVolumes: - t.SetVolumeCapacityFull(v) - case dn := <-t.chanRecoveredDataNodes: - t.RegisterRecoveredDataNode(dn) - glog.V(0).Infoln("DataNode", dn, "is back alive!") - case dn := <-t.chanDeadDataNodes: - t.UnRegisterDataNode(dn) - glog.V(0).Infoln("DataNode", dn, "is dead!") - } - } - }() -} -func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { - vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) - if !vl.SetVolumeCapacityFull(volumeInfo.Id) { - return false - } - for _, dn := range vl.vid2location[volumeInfo.Id].list { - if !volumeInfo.ReadOnly { - dn.UpAdjustActiveVolumeCountDelta(-1) - } - } - return true -} -func (t *Topology) UnRegisterDataNode(dn *DataNode) { - for _, v := range dn.GetVolumes() { - glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) - vl.SetVolumeUnavailable(dn, v.Id) - } - dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) - dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) - dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) - dn.Parent().UnlinkChildNode(dn.Id()) -} -func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { - for _, v := range dn.GetVolumes() { - vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) - if vl.isWritable(&v) { - vl.SetVolumeAvailable(dn, v.Id) - } - } -} diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go deleted file mode 100644 index ce8e9e663..000000000 --- a/go/topology/topology_map.go +++ /dev/null @@ -1,53 +0,0 @@ -package topology - -func (t *Topology) ToMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - var dcs []interface{} - for _, c := range t.Children() { - dc := c.(*DataCenter) - dcs = append(dcs, dc.ToMap()) - } - m["DataCenters"] = dcs - var layouts []interface{} - for _, col := range t.collectionMap.Items() { - c := col.(*Collection) - for _, layout := range c.storageType2VolumeLayout.Items() { - if layout != nil { - tmp := layout.(*VolumeLayout).ToMap() - tmp["collection"] = c.Name - layouts = append(layouts, tmp) - } - } - } - m["layouts"] = layouts - return m -} - -func (t *Topology) ToVolumeMap() interface{} { - m := make(map[string]interface{}) - m["Max"] = t.GetMaxVolumeCount() - m["Free"] = t.FreeSpace() - dcs := make(map[NodeId]interface{}) - for _, c := range t.Children() { - dc := c.(*DataCenter) - racks := make(map[NodeId]interface{}) - for _, r := range dc.Children() { - rack := r.(*Rack) - dataNodes := make(map[NodeId]interface{}) - for _, d := range rack.Children() { - dn := d.(*DataNode) - var volumes []interface{} - for _, v := range dn.GetVolumes() { - volumes = append(volumes, v) - } - dataNodes[d.Id()] = volumes - } - racks[r.Id()] = dataNodes - } - dcs[dc.Id()] = racks - } - m["DataCenters"] = dcs - return m -} diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go deleted file mode 100644 index eeb4fef69..000000000 --- a/go/topology/topology_vacuum.go +++ /dev/null @@ -1,158 +0,0 @@ -package topology - -import ( - "encoding/json" - "errors" - "net/url" - "time" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" - "github.com/chrislusf/seaweedfs/go/util" -) - -func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url()) - if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { - //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e) - ch <- false - } else { - //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) - ch <- ret - } - }(index, dn.Url(), vid) - } - isCheckSuccess := true - for _ = range locationlist.list { - select { - case canVacuum := <-ch: - isCheckSuccess = isCheckSuccess && canVacuum - case <-time.After(30 * time.Minute): - isCheckSuccess = false - break - } - } - return isCheckSuccess -} -func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - vl.removeFromWritable(vid) - ch := make(chan bool, locationlist.Length()) - for index, dn := range locationlist.list { - go func(index int, url string, vid storage.VolumeId) { - glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) - if e := vacuumVolume_Compact(url, vid); e != nil { - glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e) - ch <- false - } else { - glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url) - ch <- true - } - }(index, dn.Url(), vid) - } - isVacuumSuccess := true - for _ = range locationlist.list { - select { - case _ = <-ch: - case <-time.After(30 * time.Minute): - isVacuumSuccess = false - break - } - } - return isVacuumSuccess -} -func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { - isCommitSuccess := true - for _, dn := range locationlist.list { - glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) - if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { - glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e) - isCommitSuccess = false - } else { - glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url()) - } - if isCommitSuccess { - vl.SetVolumeAvailable(dn, vid) - } - } - return isCommitSuccess -} -func (t *Topology) Vacuum(garbageThreshold string) int { - glog.V(0).Infoln("Start vacuum on demand") - for _, col := range t.collectionMap.Items() { - c := col.(*Collection) - glog.V(0).Infoln("vacuum on collection:", c.Name) - for _, vl := range c.storageType2VolumeLayout.Items() { - if vl != nil { - volumeLayout := vl.(*VolumeLayout) - for vid, locationlist := range volumeLayout.vid2location { - glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) - if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { - batchVacuumVolumeCommit(volumeLayout, vid, locationlist) - } - } - } - } - } - } - return 0 -} - -type VacuumVolumeResult struct { - Result bool - Error string -} - -func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { - values := make(url.Values) - values.Add("volume", vid.String()) - values.Add("garbageThreshold", garbageThreshold) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values) - if err != nil { - glog.V(0).Infoln("parameters:", values) - return err, false - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err, false - } - if ret.Error != "" { - return errors.New(ret.Error), false - } - return nil, ret.Result -} -func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} -func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { - values := make(url.Values) - values.Add("volume", vid.String()) - jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values) - if err != nil { - return err - } - var ret VacuumVolumeResult - if err := json.Unmarshal(jsonBlob, &ret); err != nil { - return err - } - if ret.Error != "" { - return errors.New(ret.Error) - } - return nil -} diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go deleted file mode 100644 index a25ba116b..000000000 --- a/go/topology/volume_growth.go +++ /dev/null @@ -1,211 +0,0 @@ -package topology - -import ( - "fmt" - "math/rand" - "sync" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -/* -This package is created to resolve these replica placement issues: -1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies -2. in time of tight storage, how to reduce replica level -3. optimizing for hot data on faster disk, cold data on cheaper storage, -4. volume allocation for each bucket -*/ - -type VolumeGrowOption struct { - Collection string - ReplicaPlacement *storage.ReplicaPlacement - Ttl *storage.TTL - DataCenter string - Rack string - DataNode string -} - -type VolumeGrowth struct { - accessLock sync.Mutex -} - -func (o *VolumeGrowOption) String() string { - return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) -} - -func NewDefaultVolumeGrowth() *VolumeGrowth { - return &VolumeGrowth{} -} - -// one replication type may need rp.GetCopyCount() actual volumes -// given copyCount, how many logical volumes to create -func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { - switch copyCount { - case 1: - count = 7 - case 2: - count = 6 - case 3: - count = 3 - default: - count = 1 - } - return -} - -func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { - count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) - if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { - return count, nil - } - return count, err -} -func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { - vg.accessLock.Lock() - defer vg.accessLock.Unlock() - - for i := 0; i < targetCount; i++ { - if c, e := vg.findAndGrow(topo, option); e == nil { - counter += c - } else { - return counter, e - } - } - return -} - -func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, option) - if e != nil { - return 0, e - } - vid := topo.NextVolumeId() - err := vg.grow(topo, vid, option, servers...) - return len(servers), err -} - -// 1. find the main data node -// 1.1 collect all data nodes that have 1 slots -// 2.2 collect all racks that have rp.SameRackCount+1 -// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 -// 2. find rest data nodes -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { - //find main datacenter and other data centers - rp := option.ReplicaPlacement - mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { - if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { - return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) - } - if len(node.Children()) < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) - } - if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) - } - possibleRacksCount := 0 - for _, rack := range node.Children() { - possibleDataNodesCount := 0 - for _, n := range rack.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount >= rp.SameRackCount+1 { - possibleRacksCount++ - } - } - if possibleRacksCount < rp.DiffRackCount+1 { - return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) - } - return nil - }) - if dc_err != nil { - return nil, dc_err - } - - //find main rack and other racks - mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { - if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { - return fmt.Errorf("Not matching preferred rack:%s", option.Rack) - } - if node.FreeSpace() < rp.SameRackCount+1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) - } - if len(node.Children()) < rp.SameRackCount+1 { - // a bit faster way to test free racks - return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) - } - possibleDataNodesCount := 0 - for _, n := range node.Children() { - if n.FreeSpace() >= 1 { - possibleDataNodesCount++ - } - } - if possibleDataNodesCount < rp.SameRackCount+1 { - return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) - } - return nil - }) - if rack_err != nil { - return nil, rack_err - } - - //find main rack and other racks - mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { - if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { - return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) - } - if node.FreeSpace() < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) - } - return nil - }) - if server_err != nil { - return nil, server_err - } - - servers = append(servers, mainServer.(*DataNode)) - for _, server := range otherServers { - servers = append(servers, server.(*DataNode)) - } - for _, rack := range otherRacks { - r := rand.Intn(rack.FreeSpace()) - if server, e := rack.ReserveOneVolume(r); e == nil { - servers = append(servers, server) - } else { - return servers, e - } - } - for _, datacenter := range otherDataCenters { - r := rand.Intn(datacenter.FreeSpace()) - if server, e := datacenter.ReserveOneVolume(r); e == nil { - servers = append(servers, server) - } else { - return servers, e - } - } - return -} - -func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { - for _, server := range servers { - if err := AllocateVolume(server, vid, option); err == nil { - vi := storage.VolumeInfo{ - Id: vid, - Size: 0, - Collection: option.Collection, - ReplicaPlacement: option.ReplicaPlacement, - Ttl: option.Ttl, - Version: storage.CurrentVersion, - } - server.AddOrUpdateVolume(vi) - topo.RegisterVolumeLayout(vi, server) - glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String()) - } else { - glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err) - return fmt.Errorf("Failed to assign %d: %v", vid, err) - } - } - return nil -} diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go deleted file mode 100644 index 15abfcc73..000000000 --- a/go/topology/volume_growth_test.go +++ /dev/null @@ -1,135 +0,0 @@ -package topology - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/chrislusf/seaweedfs/go/sequence" - "github.com/chrislusf/seaweedfs/go/storage" -) - -var topologyLayout = ` -{ - "dc1":{ - "rack1":{ - "server111":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":2, "size":12312}, - {"id":3, "size":12312} - ], - "limit":3 - }, - "server112":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":10 - } - }, - "rack2":{ - "server121":{ - "volumes":[ - {"id":4, "size":12312}, - {"id":5, "size":12312}, - {"id":6, "size":12312} - ], - "limit":4 - }, - "server122":{ - "volumes":[], - "limit":4 - }, - "server123":{ - "volumes":[ - {"id":2, "size":12312}, - {"id":3, "size":12312}, - {"id":4, "size":12312} - ], - "limit":5 - } - } - }, - "dc2":{ - }, - "dc3":{ - "rack2":{ - "server321":{ - "volumes":[ - {"id":1, "size":12312}, - {"id":3, "size":12312}, - {"id":5, "size":12312} - ], - "limit":4 - } - } - } -} -` - -func setup(topologyLayout string) *Topology { - var data interface{} - err := json.Unmarshal([]byte(topologyLayout), &data) - if err != nil { - fmt.Println("error:", err) - } - fmt.Println("data:", data) - - //need to connect all nodes first before server adding volumes - topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", - sequence.NewMemorySequencer(), 32*1024, 5) - if err != nil { - panic("error: " + err.Error()) - } - mTopology := data.(map[string]interface{}) - for dcKey, dcValue := range mTopology { - dc := NewDataCenter(dcKey) - dcMap := dcValue.(map[string]interface{}) - topo.LinkChildNode(dc) - for rackKey, rackValue := range dcMap { - rack := NewRack(rackKey) - rackMap := rackValue.(map[string]interface{}) - dc.LinkChildNode(rack) - for serverKey, serverValue := range rackMap { - server := NewDataNode(serverKey) - serverMap := serverValue.(map[string]interface{}) - rack.LinkChildNode(server) - for _, v := range serverMap["volumes"].([]interface{}) { - m := v.(map[string]interface{}) - vi := storage.VolumeInfo{ - Id: storage.VolumeId(int64(m["id"].(float64))), - Size: uint64(m["size"].(float64)), - Version: storage.CurrentVersion} - server.AddOrUpdateVolume(vi) - } - server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) - } - } - } - - return topo -} - -func TestFindEmptySlotsForOneVolume(t *testing.T) { - topo := setup(topologyLayout) - vg := NewDefaultVolumeGrowth() - rp, _ := storage.NewReplicaPlacementFromString("002") - volumeGrowOption := &VolumeGrowOption{ - Collection: "", - ReplicaPlacement: rp, - DataCenter: "dc1", - Rack: "", - DataNode: "", - } - servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) - if err != nil { - fmt.Println("finding empty slots error :", err) - t.Fail() - } - for _, server := range servers { - fmt.Println("assigned node :", server.Id()) - } -} diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go deleted file mode 100644 index 7b6c0f117..000000000 --- a/go/topology/volume_layout.go +++ /dev/null @@ -1,226 +0,0 @@ -package topology - -import ( - "errors" - "fmt" - "math/rand" - "sync" - - "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/storage" -) - -// mapping from volume to its locations, inverted from server to volume -type VolumeLayout struct { - rp *storage.ReplicaPlacement - ttl *storage.TTL - vid2location map[storage.VolumeId]*VolumeLocationList - writables []storage.VolumeId // transient array of writable volume id - volumeSizeLimit uint64 - accessLock sync.RWMutex -} - -func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { - return &VolumeLayout{ - rp: rp, - ttl: ttl, - vid2location: make(map[storage.VolumeId]*VolumeLocationList), - writables: *new([]storage.VolumeId), - volumeSizeLimit: volumeSizeLimit, - } -} - -func (vl *VolumeLayout) String() string { - return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) -} - -func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - if _, ok := vl.vid2location[v.Id]; !ok { - vl.vid2location[v.Id] = NewVolumeLocationList() - } - vl.vid2location[v.Id].Set(dn) - glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { - vl.addToWritable(v.Id) - } else { - vl.removeFromWritable(v.Id) - } -} - -func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - vl.removeFromWritable(v.Id) - delete(vl.vid2location, v.Id) -} - -func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { - for _, id := range vl.writables { - if vid == id { - return - } - } - vl.writables = append(vl.writables, vid) -} - -func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { - return uint64(v.Size) < vl.volumeSizeLimit && - v.Version == storage.CurrentVersion && - !v.ReadOnly -} - -func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - - if location := vl.vid2location[vid]; location != nil { - return location.list - } - return nil -} - -func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - - for _, location := range vl.vid2location { - nodes = append(nodes, location.list...) - } - return -} - -func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - - len_writers := len(vl.writables) - if len_writers <= 0 { - glog.V(0).Infoln("No more writable volumes!") - return nil, 0, nil, errors.New("No more writable volumes!") - } - if option.DataCenter == "" { - vid := vl.writables[rand.Intn(len_writers)] - locationList := vl.vid2location[vid] - if locationList != nil { - return &vid, count, locationList, nil - } - return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") - } - var vid storage.VolumeId - var locationList *VolumeLocationList - counter := 0 - for _, v := range vl.writables { - volumeLocationList := vl.vid2location[v] - for _, dn := range volumeLocationList.list { - if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { - if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { - continue - } - if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { - continue - } - counter++ - if rand.Intn(counter) < 1 { - vid, locationList = v, volumeLocationList - } - } - } - } - return &vid, count, locationList, nil -} - -func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { - vl.accessLock.RLock() - defer vl.accessLock.RUnlock() - - if option.DataCenter == "" { - return len(vl.writables) - } - counter := 0 - for _, v := range vl.writables { - for _, dn := range vl.vid2location[v].list { - if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { - if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { - continue - } - if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { - continue - } - counter++ - } - } - } - return counter -} - -func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { - toDeleteIndex := -1 - for k, id := range vl.writables { - if id == vid { - toDeleteIndex = k - break - } - } - if toDeleteIndex >= 0 { - glog.V(0).Infoln("Volume", vid, "becomes unwritable") - vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...) - return true - } - return false -} -func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { - for _, v := range vl.writables { - if v == vid { - return false - } - } - glog.V(0).Infoln("Volume", vid, "becomes writable") - vl.writables = append(vl.writables, vid) - return true -} - -func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - if location, ok := vl.vid2location[vid]; ok { - if location.Remove(dn) { - if location.Length() < vl.rp.GetCopyCount() { - glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) - return vl.removeFromWritable(vid) - } - } - } - return false -} -func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - vl.vid2location[vid].Set(dn) - if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { - return vl.setVolumeWritable(vid) - } - return false -} - -func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { - vl.accessLock.Lock() - defer vl.accessLock.Unlock() - - // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") - return vl.removeFromWritable(vid) -} - -func (vl *VolumeLayout) ToMap() map[string]interface{} { - m := make(map[string]interface{}) - m["replication"] = vl.rp.String() - m["ttl"] = vl.ttl.String() - m["writables"] = vl.writables - //m["locations"] = vl.vid2location - return m -} diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go deleted file mode 100644 index d5eaf5e92..000000000 --- a/go/topology/volume_location_list.go +++ /dev/null @@ -1,65 +0,0 @@ -package topology - -import ( - "fmt" -) - -type VolumeLocationList struct { - list []*DataNode -} - -func NewVolumeLocationList() *VolumeLocationList { - return &VolumeLocationList{} -} - -func (dnll *VolumeLocationList) String() string { - return fmt.Sprintf("%v", dnll.list) -} - -func (dnll *VolumeLocationList) Head() *DataNode { - //mark first node as master volume - return dnll.list[0] -} - -func (dnll *VolumeLocationList) Length() int { - return len(dnll.list) -} - -func (dnll *VolumeLocationList) Set(loc *DataNode) { - for i := 0; i < len(dnll.list); i++ { - if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port { - dnll.list[i] = loc - return - } - } - dnll.list = append(dnll.list, loc) -} - -func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { - for i, dnl := range dnll.list { - if loc.Ip == dnl.Ip && loc.Port == dnl.Port { - dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) - return true - } - } - return false -} - -func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { - var changed bool - for _, dnl := range dnll.list { - if dnl.LastSeen < freshThreshHold { - changed = true - break - } - } - if changed { - var l []*DataNode - for _, dnl := range dnll.list { - if dnl.LastSeen >= freshThreshHold { - l = append(l, dnl) - } - } - dnll.list = l - } -} |
