aboutsummaryrefslogtreecommitdiff
path: root/weed/topology
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology')
-rw-r--r--weed/topology/allocate_volume.go35
-rw-r--r--weed/topology/cluster_commands.go31
-rw-r--r--weed/topology/collection.go57
-rw-r--r--weed/topology/configuration.go65
-rw-r--r--weed/topology/configuration_test.go42
-rw-r--r--weed/topology/data_center.go40
-rw-r--r--weed/topology/data_node.go115
-rw-r--r--weed/topology/node.go272
-rw-r--r--weed/topology/rack.go65
-rw-r--r--weed/topology/store_replicate.go150
-rw-r--r--weed/topology/topo_test.go17
-rw-r--r--weed/topology/topology.go189
-rw-r--r--weed/topology/topology_event_handling.go74
-rw-r--r--weed/topology/topology_map.go53
-rw-r--r--weed/topology/topology_vacuum.go158
-rw-r--r--weed/topology/volume_growth.go211
-rw-r--r--weed/topology/volume_growth_test.go135
-rw-r--r--weed/topology/volume_layout.go226
-rw-r--r--weed/topology/volume_location_list.go65
19 files changed, 2000 insertions, 0 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go
new file mode 100644
index 000000000..7b267a805
--- /dev/null
+++ b/weed/topology/allocate_volume.go
@@ -0,0 +1,35 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/url"
+
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type AllocateVolumeResult struct {
+ Error string
+}
+
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("collection", option.Collection)
+ values.Add("replication", option.ReplicaPlacement.String())
+ values.Add("ttl", option.Ttl.String())
+ jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
+ if err != nil {
+ return err
+ }
+ var ret AllocateVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob))
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go
new file mode 100644
index 000000000..53f45ec4d
--- /dev/null
+++ b/weed/topology/cluster_commands.go
@@ -0,0 +1,31 @@
+package topology
+
+import (
+ "github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+type MaxVolumeIdCommand struct {
+ MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
+}
+
+func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
+ return &MaxVolumeIdCommand{
+ MaxVolumeId: value,
+ }
+}
+
+func (c *MaxVolumeIdCommand) CommandName() string {
+ return "MaxVolumeId"
+}
+
+func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
+ topo := server.Context().(*Topology)
+ before := topo.GetMaxVolumeId()
+ topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
+
+ glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
+
+ return nil, nil
+}
diff --git a/weed/topology/collection.go b/weed/topology/collection.go
new file mode 100644
index 000000000..a17f0c961
--- /dev/null
+++ b/weed/topology/collection.go
@@ -0,0 +1,57 @@
+package topology
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type Collection struct {
+ Name string
+ volumeSizeLimit uint64
+ storageType2VolumeLayout *util.ConcurrentReadMap
+}
+
+func NewCollection(name string, volumeSizeLimit uint64) *Collection {
+ c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
+ c.storageType2VolumeLayout = util.NewConcurrentReadMap()
+ return c
+}
+
+func (c *Collection) String() string {
+ return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
+}
+
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+ keyString := rp.String()
+ if ttl != nil {
+ keyString += ttl.String()
+ }
+ vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
+ return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ })
+ return vl.(*VolumeLayout)
+}
+
+func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
+ for _, vl := range c.storageType2VolumeLayout.Items() {
+ if vl != nil {
+ if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
+ return list
+ }
+ }
+ }
+ return nil
+}
+
+func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
+ for _, vl := range c.storageType2VolumeLayout.Items() {
+ if vl != nil {
+ if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
+ nodes = append(nodes, list...)
+ }
+ }
+ }
+ return
+}
diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go
new file mode 100644
index 000000000..ffcebb59c
--- /dev/null
+++ b/weed/topology/configuration.go
@@ -0,0 +1,65 @@
+package topology
+
+import (
+ "encoding/xml"
+)
+
+type loc struct {
+ dcName string
+ rackName string
+}
+type rack struct {
+ Name string `xml:"name,attr"`
+ Ips []string `xml:"Ip"`
+}
+type dataCenter struct {
+ Name string `xml:"name,attr"`
+ Racks []rack `xml:"Rack"`
+}
+type topology struct {
+ DataCenters []dataCenter `xml:"DataCenter"`
+}
+type Configuration struct {
+ XMLName xml.Name `xml:"Configuration"`
+ Topo topology `xml:"Topology"`
+ ip2location map[string]loc
+}
+
+func NewConfiguration(b []byte) (*Configuration, error) {
+ c := &Configuration{}
+ err := xml.Unmarshal(b, c)
+ c.ip2location = make(map[string]loc)
+ for _, dc := range c.Topo.DataCenters {
+ for _, rack := range dc.Racks {
+ for _, ip := range rack.Ips {
+ c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
+ }
+ }
+ }
+ return c, err
+}
+
+func (c *Configuration) String() string {
+ if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
+ return string(b)
+ }
+ return ""
+}
+
+func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
+ }
+ }
+
+ if dcName == "" {
+ dcName = "DefaultDataCenter"
+ }
+
+ if rackName == "" {
+ rackName = "DefaultRack"
+ }
+
+ return dcName, rackName
+}
diff --git a/weed/topology/configuration_test.go b/weed/topology/configuration_test.go
new file mode 100644
index 000000000..0a353d16e
--- /dev/null
+++ b/weed/topology/configuration_test.go
@@ -0,0 +1,42 @@
+package topology
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestLoadConfiguration(t *testing.T) {
+
+ confContent := `
+
+<?xml version="1.0" encoding="UTF-8" ?>
+<Configuration>
+ <Topology>
+ <DataCenter name="dc1">
+ <Rack name="rack1">
+ <Ip>192.168.1.1</Ip>
+ </Rack>
+ </DataCenter>
+ <DataCenter name="dc2">
+ <Rack name="rack1">
+ <Ip>192.168.1.2</Ip>
+ </Rack>
+ <Rack name="rack2">
+ <Ip>192.168.1.3</Ip>
+ <Ip>192.168.1.4</Ip>
+ </Rack>
+ </DataCenter>
+ </Topology>
+</Configuration>
+`
+ c, err := NewConfiguration([]byte(confContent))
+
+ fmt.Printf("%s\n", c)
+ if err != nil {
+ t.Fatalf("unmarshal error:%v", err)
+ }
+
+ if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
+ t.Fatalf("unmarshal error:%s", c)
+ }
+}
diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go
new file mode 100644
index 000000000..bcf2dfd31
--- /dev/null
+++ b/weed/topology/data_center.go
@@ -0,0 +1,40 @@
+package topology
+
+type DataCenter struct {
+ NodeImpl
+}
+
+func NewDataCenter(id string) *DataCenter {
+ dc := &DataCenter{}
+ dc.id = NodeId(id)
+ dc.nodeType = "DataCenter"
+ dc.children = make(map[NodeId]Node)
+ dc.NodeImpl.value = dc
+ return dc
+}
+
+func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ if string(rack.Id()) == rackName {
+ return rack
+ }
+ }
+ rack := NewRack(rackName)
+ dc.LinkChildNode(rack)
+ return rack
+}
+
+func (dc *DataCenter) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Id"] = dc.Id()
+ m["Max"] = dc.GetMaxVolumeCount()
+ m["Free"] = dc.FreeSpace()
+ var racks []interface{}
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ racks = append(racks, rack.ToMap())
+ }
+ m["Racks"] = racks
+ return m
+}
diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go
new file mode 100644
index 000000000..1404d4aa8
--- /dev/null
+++ b/weed/topology/data_node.go
@@ -0,0 +1,115 @@
+package topology
+
+import (
+ "fmt"
+ "strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+type DataNode struct {
+ NodeImpl
+ volumes map[storage.VolumeId]storage.VolumeInfo
+ Ip string
+ Port int
+ PublicUrl string
+ LastSeen int64 // unix time in seconds
+ Dead bool
+}
+
+func NewDataNode(id string) *DataNode {
+ s := &DataNode{}
+ s.id = NodeId(id)
+ s.nodeType = "DataNode"
+ s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
+ s.NodeImpl.value = s
+ return s
+}
+
+func (dn *DataNode) String() string {
+ dn.RLock()
+ defer dn.RUnlock()
+ return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
+}
+
+func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
+ dn.Lock()
+ defer dn.Unlock()
+ if _, ok := dn.volumes[v.Id]; !ok {
+ dn.volumes[v.Id] = v
+ dn.UpAdjustVolumeCountDelta(1)
+ if !v.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(1)
+ }
+ dn.UpAdjustMaxVolumeId(v.Id)
+ } else {
+ dn.volumes[v.Id] = v
+ }
+}
+
+func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
+ actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
+ for _, v := range actualVolumes {
+ actualVolumeMap[v.Id] = v
+ }
+ dn.RLock()
+ for vid, v := range dn.volumes {
+ if _, ok := actualVolumeMap[vid]; !ok {
+ glog.V(0).Infoln("Deleting volume id:", vid)
+ delete(dn.volumes, vid)
+ deletedVolumes = append(deletedVolumes, v)
+ dn.UpAdjustVolumeCountDelta(-1)
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ } //TODO: adjust max volume id, if need to reclaim volume ids
+ dn.RUnlock()
+ for _, v := range actualVolumes {
+ dn.AddOrUpdateVolume(v)
+ }
+ return
+}
+
+func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
+ dn.RLock()
+ for _, v := range dn.volumes {
+ ret = append(ret, v)
+ }
+ dn.RUnlock()
+ return ret
+}
+
+func (dn *DataNode) GetDataCenter() *DataCenter {
+ return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
+}
+
+func (dn *DataNode) GetRack() *Rack {
+ return dn.Parent().(*NodeImpl).value.(*Rack)
+}
+
+func (dn *DataNode) GetTopology() *Topology {
+ p := dn.Parent()
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ t := p.(*Topology)
+ return t
+}
+
+func (dn *DataNode) MatchLocation(ip string, port int) bool {
+ return dn.Ip == ip && dn.Port == port
+}
+
+func (dn *DataNode) Url() string {
+ return dn.Ip + ":" + strconv.Itoa(dn.Port)
+}
+
+func (dn *DataNode) ToMap() interface{} {
+ ret := make(map[string]interface{})
+ ret["Url"] = dn.Url()
+ ret["Volumes"] = dn.GetVolumeCount()
+ ret["Max"] = dn.GetMaxVolumeCount()
+ ret["Free"] = dn.FreeSpace()
+ ret["PublicUrl"] = dn.PublicUrl
+ return ret
+}
diff --git a/weed/topology/node.go b/weed/topology/node.go
new file mode 100644
index 000000000..4ce35f4b0
--- /dev/null
+++ b/weed/topology/node.go
@@ -0,0 +1,272 @@
+package topology
+
+import (
+ "errors"
+ "math/rand"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+type NodeId string
+type Node interface {
+ Id() NodeId
+ String() string
+ FreeSpace() int
+ ReserveOneVolume(r int) (*DataNode, error)
+ UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
+ UpAdjustVolumeCountDelta(volumeCountDelta int)
+ UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
+ UpAdjustMaxVolumeId(vid storage.VolumeId)
+
+ GetVolumeCount() int
+ GetActiveVolumeCount() int
+ GetMaxVolumeCount() int
+ GetMaxVolumeId() storage.VolumeId
+ SetParent(Node)
+ LinkChildNode(node Node)
+ UnlinkChildNode(nodeId NodeId)
+ CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
+
+ IsDataNode() bool
+ IsRack() bool
+ IsDataCenter() bool
+ Children() []Node
+ Parent() Node
+
+ GetValue() interface{} //get reference to the topology,dc,rack,datanode
+}
+type NodeImpl struct {
+ id NodeId
+ volumeCount int
+ activeVolumeCount int
+ maxVolumeCount int
+ parent Node
+ sync.RWMutex // lock children
+ children map[NodeId]Node
+ maxVolumeId storage.VolumeId
+
+ //for rack, data center, topology
+ nodeType string
+ value interface{}
+}
+
+// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
+func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
+ candidates := make([]Node, 0, len(n.children))
+ var errs []string
+ n.RLock()
+ for _, node := range n.children {
+ if err := filterFirstNodeFn(node); err == nil {
+ candidates = append(candidates, node)
+ } else {
+ errs = append(errs, string(node.Id())+":"+err.Error())
+ }
+ }
+ n.RUnlock()
+ if len(candidates) == 0 {
+ return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
+ }
+ firstNode = candidates[rand.Intn(len(candidates))]
+ glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
+
+ restNodes = make([]Node, numberOfNodes-1)
+ candidates = candidates[:0]
+ n.RLock()
+ for _, node := range n.children {
+ if node.Id() == firstNode.Id() {
+ continue
+ }
+ if node.FreeSpace() <= 0 {
+ continue
+ }
+ glog.V(2).Infoln("select rest node candidate:", node.Id())
+ candidates = append(candidates, node)
+ }
+ n.RUnlock()
+ glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ ret := len(restNodes) == 0
+ for k, node := range candidates {
+ if k < len(restNodes) {
+ restNodes[k] = node
+ if k == len(restNodes)-1 {
+ ret = true
+ }
+ } else {
+ r := rand.Intn(k + 1)
+ if r < len(restNodes) {
+ restNodes[r] = node
+ }
+ }
+ }
+ if !ret {
+ glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ err = errors.New("Not enough data node found!")
+ }
+ return
+}
+
+func (n *NodeImpl) IsDataNode() bool {
+ return n.nodeType == "DataNode"
+}
+func (n *NodeImpl) IsRack() bool {
+ return n.nodeType == "Rack"
+}
+func (n *NodeImpl) IsDataCenter() bool {
+ return n.nodeType == "DataCenter"
+}
+func (n *NodeImpl) String() string {
+ if n.parent != nil {
+ return n.parent.String() + ":" + string(n.id)
+ }
+ return string(n.id)
+}
+func (n *NodeImpl) Id() NodeId {
+ return n.id
+}
+func (n *NodeImpl) FreeSpace() int {
+ return n.maxVolumeCount - n.volumeCount
+}
+func (n *NodeImpl) SetParent(node Node) {
+ n.parent = node
+}
+func (n *NodeImpl) Children() (ret []Node) {
+ n.RLock()
+ defer n.RUnlock()
+ for _, c := range n.children {
+ ret = append(ret, c)
+ }
+ return ret
+}
+func (n *NodeImpl) Parent() Node {
+ return n.parent
+}
+func (n *NodeImpl) GetValue() interface{} {
+ return n.value
+}
+func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
+ n.RLock()
+ defer n.RUnlock()
+ for _, node := range n.children {
+ freeSpace := node.FreeSpace()
+ // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
+ if freeSpace <= 0 {
+ continue
+ }
+ if r >= freeSpace {
+ r -= freeSpace
+ } else {
+ if node.IsDataNode() && node.FreeSpace() > 0 {
+ // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
+ return node.(*DataNode), nil
+ }
+ assignedNode, err = node.ReserveOneVolume(r)
+ if err != nil {
+ return
+ }
+ }
+ }
+ return
+}
+
+func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
+ n.maxVolumeCount += maxVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
+ n.volumeCount += volumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
+ n.activeVolumeCount += activeVolumeCountDelta
+ if n.parent != nil {
+ n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
+ }
+}
+func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
+ if n.maxVolumeId < vid {
+ n.maxVolumeId = vid
+ if n.parent != nil {
+ n.parent.UpAdjustMaxVolumeId(vid)
+ }
+ }
+}
+func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
+ return n.maxVolumeId
+}
+func (n *NodeImpl) GetVolumeCount() int {
+ return n.volumeCount
+}
+func (n *NodeImpl) GetActiveVolumeCount() int {
+ return n.activeVolumeCount
+}
+func (n *NodeImpl) GetMaxVolumeCount() int {
+ return n.maxVolumeCount
+}
+
+func (n *NodeImpl) LinkChildNode(node Node) {
+ n.Lock()
+ defer n.Unlock()
+ if n.children[node.Id()] == nil {
+ n.children[node.Id()] = node
+ n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
+ n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
+ n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
+ node.SetParent(n)
+ glog.V(0).Infoln(n, "adds child", node.Id())
+ }
+}
+
+func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
+ n.Lock()
+ defer n.Unlock()
+ node := n.children[nodeId]
+ if node != nil {
+ node.SetParent(nil)
+ delete(n.children, node.Id())
+ n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
+ n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
+ n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
+ glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount)
+ }
+}
+
+func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
+ if n.IsRack() {
+ for _, c := range n.Children() {
+ dn := c.(*DataNode) //can not cast n to DataNode
+ if dn.LastSeen < freshThreshHold {
+ if !dn.Dead {
+ dn.Dead = true
+ n.GetTopology().chanDeadDataNodes <- dn
+ }
+ }
+ for _, v := range dn.GetVolumes() {
+ if uint64(v.Size) >= volumeSizeLimit {
+ //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
+ n.GetTopology().chanFullVolumes <- v
+ }
+ }
+ }
+ } else {
+ for _, c := range n.Children() {
+ c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
+ }
+ }
+}
+
+func (n *NodeImpl) GetTopology() *Topology {
+ var p Node
+ p = n
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ return p.GetValue().(*Topology)
+}
diff --git a/weed/topology/rack.go b/weed/topology/rack.go
new file mode 100644
index 000000000..1ca2f8de8
--- /dev/null
+++ b/weed/topology/rack.go
@@ -0,0 +1,65 @@
+package topology
+
+import (
+ "strconv"
+ "time"
+)
+
+type Rack struct {
+ NodeImpl
+}
+
+func NewRack(id string) *Rack {
+ r := &Rack{}
+ r.id = NodeId(id)
+ r.nodeType = "Rack"
+ r.children = make(map[NodeId]Node)
+ r.NodeImpl.value = r
+ return r
+}
+
+func (r *Rack) FindDataNode(ip string, port int) *DataNode {
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ return dn
+ }
+ }
+ return nil
+}
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ if dn.MatchLocation(ip, port) {
+ dn.LastSeen = time.Now().Unix()
+ if dn.Dead {
+ dn.Dead = false
+ r.GetTopology().chanRecoveredDataNodes <- dn
+ dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
+ }
+ return dn
+ }
+ }
+ dn := NewDataNode(ip + ":" + strconv.Itoa(port))
+ dn.Ip = ip
+ dn.Port = port
+ dn.PublicUrl = publicUrl
+ dn.maxVolumeCount = maxVolumeCount
+ dn.LastSeen = time.Now().Unix()
+ r.LinkChildNode(dn)
+ return dn
+}
+
+func (r *Rack) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Id"] = r.Id()
+ m["Max"] = r.GetMaxVolumeCount()
+ m["Free"] = r.FreeSpace()
+ var dns []interface{}
+ for _, c := range r.Children() {
+ dn := c.(*DataNode)
+ dns = append(dns, dn.ToMap())
+ }
+ m["DataNodes"] = dns
+ return m
+}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
new file mode 100644
index 000000000..be5777167
--- /dev/null
+++ b/weed/topology/store_replicate.go
@@ -0,0 +1,150 @@
+package topology
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "net/url"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func ReplicatedWrite(masterNode string, s *storage.Store,
+ volumeId storage.VolumeId, needle *storage.Needle,
+ r *http.Request) (size uint32, errorStatus string) {
+
+ //check JWT
+ jwt := security.GetJwt(r)
+
+ ret, err := s.Write(volumeId, needle)
+ needToReplicate := !s.HasVolume(volumeId)
+ if err != nil {
+ errorStatus = "Failed to write to local disk (" + err.Error() + ")"
+ } else if ret > 0 {
+ needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
+ } else {
+ errorStatus = "Failed to write to local disk"
+ }
+ if !needToReplicate && ret > 0 {
+ needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+
+ if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: location.Url,
+ Path: r.URL.Path,
+ }
+ q := url.Values{
+ "type": {"replicate"},
+ }
+ if needle.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
+ }
+ if needle.IsChunkedManifest() {
+ q.Set("cm", "true")
+ }
+ u.RawQuery = q.Encode()
+ _, err := operation.Upload(u.String(),
+ string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
+ jwt)
+ return err
+ }); err != nil {
+ ret = 0
+ errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
+ }
+ }
+ }
+ size = ret
+ return
+}
+
+func ReplicatedDelete(masterNode string, store *storage.Store,
+ volumeId storage.VolumeId, n *storage.Needle,
+ r *http.Request) (uint32, error) {
+
+ //check JWT
+ jwt := security.GetJwt(r)
+
+ ret, err := store.Delete(volumeId, n)
+ if err != nil {
+ glog.V(0).Infoln("delete error:", err)
+ return ret, err
+ }
+
+ needToReplicate := !store.HasVolume(volumeId)
+ if !needToReplicate && ret > 0 {
+ needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
+ }
+ if needToReplicate { //send to other replica locations
+ if r.FormValue("type") != "replicate" {
+ if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
+ return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
+ }); err != nil {
+ ret = 0
+ }
+ }
+ }
+ return ret, err
+}
+
+type DistributedOperationResult map[string]error
+
+func (dr DistributedOperationResult) Error() error {
+ var errs []string
+ for k, v := range dr {
+ if v != nil {
+ errs = append(errs, fmt.Sprintf("[%s]: %v", k, v))
+ }
+ }
+ if len(errs) == 0 {
+ return nil
+ }
+ return errors.New(strings.Join(errs, "\n"))
+}
+
+type RemoteResult struct {
+ Host string
+ Error error
+}
+
+func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
+ length := 0
+ selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
+ results := make(chan RemoteResult)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ length++
+ go func(location operation.Location, results chan RemoteResult) {
+ results <- RemoteResult{location.Url, op(location)}
+ }(location, results)
+ }
+ }
+ ret := DistributedOperationResult(make(map[string]error))
+ for i := 0; i < length; i++ {
+ result := <-results
+ ret[result.Host] = result.Error
+ }
+ if volume := store.GetVolume(volumeId); volume != nil {
+ if length+1 < volume.ReplicaPlacement.GetCopyCount() {
+ return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
+ }
+ }
+ return ret.Error()
+ } else {
+ glog.V(0).Infoln()
+ return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
+ }
+ return nil
+}
diff --git a/weed/topology/topo_test.go b/weed/topology/topo_test.go
new file mode 100644
index 000000000..9a0dbc6b8
--- /dev/null
+++ b/weed/topology/topo_test.go
@@ -0,0 +1,17 @@
+package topology
+
+import (
+ "testing"
+)
+
+func TestRemoveDataCenter(t *testing.T) {
+ topo := setup(topologyLayout)
+ topo.UnlinkChildNode(NodeId("dc2"))
+ if topo.GetActiveVolumeCount() != 15 {
+ t.Fail()
+ }
+ topo.UnlinkChildNode(NodeId("dc3"))
+ if topo.GetActiveVolumeCount() != 12 {
+ t.Fail()
+ }
+}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
new file mode 100644
index 000000000..04b500053
--- /dev/null
+++ b/weed/topology/topology.go
@@ -0,0 +1,189 @@
+package topology
+
+import (
+ "errors"
+ "io/ioutil"
+ "math/rand"
+
+ "github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type Topology struct {
+ NodeImpl
+
+ collectionMap *util.ConcurrentReadMap
+
+ pulse int64
+
+ volumeSizeLimit uint64
+
+ Sequence sequence.Sequencer
+
+ chanDeadDataNodes chan *DataNode
+ chanRecoveredDataNodes chan *DataNode
+ chanFullVolumes chan storage.VolumeInfo
+
+ configuration *Configuration
+
+ RaftServer raft.Server
+}
+
+func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
+ t := &Topology{}
+ t.id = NodeId(id)
+ t.nodeType = "Topology"
+ t.NodeImpl.value = t
+ t.children = make(map[NodeId]Node)
+ t.collectionMap = util.NewConcurrentReadMap()
+ t.pulse = int64(pulse)
+ t.volumeSizeLimit = volumeSizeLimit
+
+ t.Sequence = seq
+
+ t.chanDeadDataNodes = make(chan *DataNode)
+ t.chanRecoveredDataNodes = make(chan *DataNode)
+ t.chanFullVolumes = make(chan storage.VolumeInfo)
+
+ err := t.loadConfiguration(confFile)
+
+ return t, err
+}
+
+func (t *Topology) IsLeader() bool {
+ if leader, e := t.Leader(); e == nil {
+ return leader == t.RaftServer.Name()
+ }
+ return false
+}
+
+func (t *Topology) Leader() (string, error) {
+ l := ""
+ if t.RaftServer != nil {
+ l = t.RaftServer.Leader()
+ } else {
+ return "", errors.New("Raft Server not ready yet!")
+ }
+
+ if l == "" {
+ // We are a single node cluster, we are the leader
+ return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
+ }
+
+ return l, nil
+}
+
+func (t *Topology) loadConfiguration(configurationFile string) error {
+ b, e := ioutil.ReadFile(configurationFile)
+ if e == nil {
+ t.configuration, e = NewConfiguration(b)
+ return e
+ }
+ glog.V(0).Infoln("Using default configurations.")
+ return nil
+}
+
+func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
+ //maybe an issue if lots of collections?
+ if collection == "" {
+ for _, c := range t.collectionMap.Items() {
+ if list := c.(*Collection).Lookup(vid); list != nil {
+ return list
+ }
+ }
+ } else {
+ if c, ok := t.collectionMap.Find(collection); ok {
+ return c.(*Collection).Lookup(vid)
+ }
+ }
+ return nil
+}
+
+func (t *Topology) NextVolumeId() storage.VolumeId {
+ vid := t.GetMaxVolumeId()
+ next := vid.Next()
+ go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
+ return next
+}
+
+func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
+ return vl.GetActiveVolumeCount(option) > 0
+}
+
+func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
+ if err != nil || datanodes.Length() == 0 {
+ return "", 0, nil, errors.New("No writable volumes available!")
+ }
+ fileId, count := t.Sequence.NextFileId(count)
+ return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+}
+
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
+ return t.collectionMap.Get(collectionName, func() interface{} {
+ return NewCollection(collectionName, t.volumeSizeLimit)
+ }).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
+}
+
+func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
+ c, hasCollection := t.collectionMap.Find(collectionName)
+ return c.(*Collection), hasCollection
+}
+
+func (t *Topology) DeleteCollection(collectionName string) {
+ t.collectionMap.Delete(collectionName)
+}
+
+func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
+}
+func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
+ glog.Infof("removing volume info:%+v", v)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
+}
+
+func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
+ t.Sequence.SetMax(*joinMessage.MaxFileKey)
+ dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
+ if *joinMessage.IsInit && dn != nil {
+ t.UnRegisterDataNode(dn)
+ }
+ dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
+ int(*joinMessage.Port), *joinMessage.PublicUrl,
+ int(*joinMessage.MaxVolumeCount))
+ var volumeInfos []storage.VolumeInfo
+ for _, v := range joinMessage.Volumes {
+ if vi, err := storage.NewVolumeInfo(v); err == nil {
+ volumeInfos = append(volumeInfos, vi)
+ } else {
+ glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
+ }
+ }
+ deletedVolumes := dn.UpdateVolumes(volumeInfos)
+ for _, v := range volumeInfos {
+ t.RegisterVolumeLayout(v, dn)
+ }
+ for _, v := range deletedVolumes {
+ t.UnRegisterVolumeLayout(v, dn)
+ }
+}
+
+func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ if string(dc.Id()) == dcName {
+ return dc
+ }
+ }
+ dc := NewDataCenter(dcName)
+ t.LinkChildNode(dc)
+ return dc
+}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
new file mode 100644
index 000000000..737b94482
--- /dev/null
+++ b/weed/topology/topology_event_handling.go
@@ -0,0 +1,74 @@
+package topology
+
+import (
+ "math/rand"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
+ go func() {
+ for {
+ if t.IsLeader() {
+ freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
+ t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
+ }
+ time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+ go func(garbageThreshold string) {
+ c := time.Tick(15 * time.Minute)
+ for _ = range c {
+ if t.IsLeader() {
+ t.Vacuum(garbageThreshold)
+ }
+ }
+ }(garbageThreshold)
+ go func() {
+ for {
+ select {
+ case v := <-t.chanFullVolumes:
+ t.SetVolumeCapacityFull(v)
+ case dn := <-t.chanRecoveredDataNodes:
+ t.RegisterRecoveredDataNode(dn)
+ glog.V(0).Infoln("DataNode", dn, "is back alive!")
+ case dn := <-t.chanDeadDataNodes:
+ t.UnRegisterDataNode(dn)
+ glog.V(0).Infoln("DataNode", dn, "is dead!")
+ }
+ }
+ }()
+}
+func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
+ if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
+ return false
+ }
+ for _, dn := range vl.vid2location[volumeInfo.Id].list {
+ if !volumeInfo.ReadOnly {
+ dn.UpAdjustActiveVolumeCountDelta(-1)
+ }
+ }
+ return true
+}
+func (t *Topology) UnRegisterDataNode(dn *DataNode) {
+ for _, v := range dn.GetVolumes() {
+ glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
+ vl.SetVolumeUnavailable(dn, v.Id)
+ }
+ dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
+ dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
+ dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
+ dn.Parent().UnlinkChildNode(dn.Id())
+}
+func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
+ for _, v := range dn.GetVolumes() {
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
+ if vl.isWritable(&v) {
+ vl.SetVolumeAvailable(dn, v.Id)
+ }
+ }
+}
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
new file mode 100644
index 000000000..ce8e9e663
--- /dev/null
+++ b/weed/topology/topology_map.go
@@ -0,0 +1,53 @@
+package topology
+
+func (t *Topology) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ var dcs []interface{}
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ dcs = append(dcs, dc.ToMap())
+ }
+ m["DataCenters"] = dcs
+ var layouts []interface{}
+ for _, col := range t.collectionMap.Items() {
+ c := col.(*Collection)
+ for _, layout := range c.storageType2VolumeLayout.Items() {
+ if layout != nil {
+ tmp := layout.(*VolumeLayout).ToMap()
+ tmp["collection"] = c.Name
+ layouts = append(layouts, tmp)
+ }
+ }
+ }
+ m["layouts"] = layouts
+ return m
+}
+
+func (t *Topology) ToVolumeMap() interface{} {
+ m := make(map[string]interface{})
+ m["Max"] = t.GetMaxVolumeCount()
+ m["Free"] = t.FreeSpace()
+ dcs := make(map[NodeId]interface{})
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ racks := make(map[NodeId]interface{})
+ for _, r := range dc.Children() {
+ rack := r.(*Rack)
+ dataNodes := make(map[NodeId]interface{})
+ for _, d := range rack.Children() {
+ dn := d.(*DataNode)
+ var volumes []interface{}
+ for _, v := range dn.GetVolumes() {
+ volumes = append(volumes, v)
+ }
+ dataNodes[d.Id()] = volumes
+ }
+ racks[r.Id()] = dataNodes
+ }
+ dcs[dc.Id()] = racks
+ }
+ m["DataCenters"] = dcs
+ return m
+}
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
new file mode 100644
index 000000000..8cf8dfbeb
--- /dev/null
+++ b/weed/topology/topology_vacuum.go
@@ -0,0 +1,158 @@
+package topology
+
+import (
+ "encoding/json"
+ "errors"
+ "net/url"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
+ if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
+ //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
+ ch <- ret
+ }
+ }(index, dn.Url(), vid)
+ }
+ isCheckSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case canVacuum := <-ch:
+ isCheckSuccess = isCheckSuccess && canVacuum
+ case <-time.After(30 * time.Minute):
+ isCheckSuccess = false
+ break
+ }
+ }
+ return isCheckSuccess
+}
+func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ vl.removeFromWritable(vid)
+ ch := make(chan bool, locationlist.Length())
+ for index, dn := range locationlist.list {
+ go func(index int, url string, vid storage.VolumeId) {
+ glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
+ if e := vacuumVolume_Compact(url, vid); e != nil {
+ glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
+ ch <- false
+ } else {
+ glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
+ ch <- true
+ }
+ }(index, dn.Url(), vid)
+ }
+ isVacuumSuccess := true
+ for _ = range locationlist.list {
+ select {
+ case _ = <-ch:
+ case <-time.After(30 * time.Minute):
+ isVacuumSuccess = false
+ break
+ }
+ }
+ return isVacuumSuccess
+}
+func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
+ isCommitSuccess := true
+ for _, dn := range locationlist.list {
+ glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
+ if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
+ glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
+ isCommitSuccess = false
+ } else {
+ glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
+ }
+ if isCommitSuccess {
+ vl.SetVolumeAvailable(dn, vid)
+ }
+ }
+ return isCommitSuccess
+}
+func (t *Topology) Vacuum(garbageThreshold string) int {
+ glog.V(0).Infoln("Start vacuum on demand")
+ for _, col := range t.collectionMap.Items() {
+ c := col.(*Collection)
+ glog.V(0).Infoln("vacuum on collection:", c.Name)
+ for _, vl := range c.storageType2VolumeLayout.Items() {
+ if vl != nil {
+ volumeLayout := vl.(*VolumeLayout)
+ for vid, locationlist := range volumeLayout.vid2location {
+ glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
+ if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
+ batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
+ }
+ }
+ }
+ }
+ }
+ }
+ return 0
+}
+
+type VacuumVolumeResult struct {
+ Result bool
+ Error string
+}
+
+func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("garbageThreshold", garbageThreshold)
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
+ if err != nil {
+ glog.V(0).Infoln("parameters:", values)
+ return err, false
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err, false
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error), false
+ }
+ return nil, ret.Result
+}
+func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
+func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
+ if err != nil {
+ return err
+ }
+ var ret VacuumVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go
new file mode 100644
index 000000000..3a1c9c567
--- /dev/null
+++ b/weed/topology/volume_growth.go
@@ -0,0 +1,211 @@
+package topology
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+/*
+This package is created to resolve these replica placement issues:
+1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
+2. in time of tight storage, how to reduce replica level
+3. optimizing for hot data on faster disk, cold data on cheaper storage,
+4. volume allocation for each bucket
+*/
+
+type VolumeGrowOption struct {
+ Collection string
+ ReplicaPlacement *storage.ReplicaPlacement
+ Ttl *storage.TTL
+ DataCenter string
+ Rack string
+ DataNode string
+}
+
+type VolumeGrowth struct {
+ accessLock sync.Mutex
+}
+
+func (o *VolumeGrowOption) String() string {
+ return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode)
+}
+
+func NewDefaultVolumeGrowth() *VolumeGrowth {
+ return &VolumeGrowth{}
+}
+
+// one replication type may need rp.GetCopyCount() actual volumes
+// given copyCount, how many logical volumes to create
+func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
+ switch copyCount {
+ case 1:
+ count = 7
+ case 2:
+ count = 6
+ case 3:
+ count = 3
+ default:
+ count = 1
+ }
+ return
+}
+
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+ if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
+ return count, nil
+ }
+ return count, err
+}
+func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
+ vg.accessLock.Lock()
+ defer vg.accessLock.Unlock()
+
+ for i := 0; i < targetCount; i++ {
+ if c, e := vg.findAndGrow(topo, option); e == nil {
+ counter += c
+ } else {
+ return counter, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+ servers, e := vg.findEmptySlotsForOneVolume(topo, option)
+ if e != nil {
+ return 0, e
+ }
+ vid := topo.NextVolumeId()
+ err := vg.grow(topo, vid, option, servers...)
+ return len(servers), err
+}
+
+// 1. find the main data node
+// 1.1 collect all data nodes that have 1 slots
+// 2.2 collect all racks that have rp.SameRackCount+1
+// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
+// 2. find rest data nodes
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
+ //find main datacenter and other data centers
+ rp := option.ReplicaPlacement
+ mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
+ if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
+ return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
+ }
+ if len(node.Children()) < rp.DiffRackCount+1 {
+ return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
+ }
+ if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
+ }
+ possibleRacksCount := 0
+ for _, rack := range node.Children() {
+ possibleDataNodesCount := 0
+ for _, n := range rack.Children() {
+ if n.FreeSpace() >= 1 {
+ possibleDataNodesCount++
+ }
+ }
+ if possibleDataNodesCount >= rp.SameRackCount+1 {
+ possibleRacksCount++
+ }
+ }
+ if possibleRacksCount < rp.DiffRackCount+1 {
+ return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
+ }
+ return nil
+ })
+ if dc_err != nil {
+ return nil, dc_err
+ }
+
+ //find main rack and other racks
+ mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
+ if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
+ return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
+ }
+ if node.FreeSpace() < rp.SameRackCount+1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
+ }
+ if len(node.Children()) < rp.SameRackCount+1 {
+ // a bit faster way to test free racks
+ return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
+ }
+ possibleDataNodesCount := 0
+ for _, n := range node.Children() {
+ if n.FreeSpace() >= 1 {
+ possibleDataNodesCount++
+ }
+ }
+ if possibleDataNodesCount < rp.SameRackCount+1 {
+ return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
+ }
+ return nil
+ })
+ if rack_err != nil {
+ return nil, rack_err
+ }
+
+ //find main rack and other racks
+ mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
+ if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
+ return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
+ }
+ if node.FreeSpace() < 1 {
+ return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
+ }
+ return nil
+ })
+ if server_err != nil {
+ return nil, server_err
+ }
+
+ servers = append(servers, mainServer.(*DataNode))
+ for _, server := range otherServers {
+ servers = append(servers, server.(*DataNode))
+ }
+ for _, rack := range otherRacks {
+ r := rand.Intn(rack.FreeSpace())
+ if server, e := rack.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ for _, datacenter := range otherDataCenters {
+ r := rand.Intn(datacenter.FreeSpace())
+ if server, e := datacenter.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
+ }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
+ for _, server := range servers {
+ if err := AllocateVolume(server, vid, option); err == nil {
+ vi := storage.VolumeInfo{
+ Id: vid,
+ Size: 0,
+ Collection: option.Collection,
+ ReplicaPlacement: option.ReplicaPlacement,
+ Ttl: option.Ttl,
+ Version: storage.CurrentVersion,
+ }
+ server.AddOrUpdateVolume(vi)
+ topo.RegisterVolumeLayout(vi, server)
+ glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
+ } else {
+ glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
+ return fmt.Errorf("Failed to assign %d: %v", vid, err)
+ }
+ }
+ return nil
+}
diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go
new file mode 100644
index 000000000..e5716674a
--- /dev/null
+++ b/weed/topology/volume_growth_test.go
@@ -0,0 +1,135 @@
+package topology
+
+import (
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+var topologyLayout = `
+{
+ "dc1":{
+ "rack1":{
+ "server111":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":2, "size":12312},
+ {"id":3, "size":12312}
+ ],
+ "limit":3
+ },
+ "server112":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":10
+ }
+ },
+ "rack2":{
+ "server121":{
+ "volumes":[
+ {"id":4, "size":12312},
+ {"id":5, "size":12312},
+ {"id":6, "size":12312}
+ ],
+ "limit":4
+ },
+ "server122":{
+ "volumes":[],
+ "limit":4
+ },
+ "server123":{
+ "volumes":[
+ {"id":2, "size":12312},
+ {"id":3, "size":12312},
+ {"id":4, "size":12312}
+ ],
+ "limit":5
+ }
+ }
+ },
+ "dc2":{
+ },
+ "dc3":{
+ "rack2":{
+ "server321":{
+ "volumes":[
+ {"id":1, "size":12312},
+ {"id":3, "size":12312},
+ {"id":5, "size":12312}
+ ],
+ "limit":4
+ }
+ }
+ }
+}
+`
+
+func setup(topologyLayout string) *Topology {
+ var data interface{}
+ err := json.Unmarshal([]byte(topologyLayout), &data)
+ if err != nil {
+ fmt.Println("error:", err)
+ }
+ fmt.Println("data:", data)
+
+ //need to connect all nodes first before server adding volumes
+ topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
+ sequence.NewMemorySequencer(), 32*1024, 5)
+ if err != nil {
+ panic("error: " + err.Error())
+ }
+ mTopology := data.(map[string]interface{})
+ for dcKey, dcValue := range mTopology {
+ dc := NewDataCenter(dcKey)
+ dcMap := dcValue.(map[string]interface{})
+ topo.LinkChildNode(dc)
+ for rackKey, rackValue := range dcMap {
+ rack := NewRack(rackKey)
+ rackMap := rackValue.(map[string]interface{})
+ dc.LinkChildNode(rack)
+ for serverKey, serverValue := range rackMap {
+ server := NewDataNode(serverKey)
+ serverMap := serverValue.(map[string]interface{})
+ rack.LinkChildNode(server)
+ for _, v := range serverMap["volumes"].([]interface{}) {
+ m := v.(map[string]interface{})
+ vi := storage.VolumeInfo{
+ Id: storage.VolumeId(int64(m["id"].(float64))),
+ Size: uint64(m["size"].(float64)),
+ Version: storage.CurrentVersion}
+ server.AddOrUpdateVolume(vi)
+ }
+ server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
+ }
+ }
+ }
+
+ return topo
+}
+
+func TestFindEmptySlotsForOneVolume(t *testing.T) {
+ topo := setup(topologyLayout)
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := storage.NewReplicaPlacementFromString("002")
+ volumeGrowOption := &VolumeGrowOption{
+ Collection: "",
+ ReplicaPlacement: rp,
+ DataCenter: "dc1",
+ Rack: "",
+ DataNode: "",
+ }
+ servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
+ t.Fail()
+ }
+ for _, server := range servers {
+ fmt.Println("assigned node :", server.Id())
+ }
+}
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
new file mode 100644
index 000000000..e500de583
--- /dev/null
+++ b/weed/topology/volume_layout.go
@@ -0,0 +1,226 @@
+package topology
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+// mapping from volume to its locations, inverted from server to volume
+type VolumeLayout struct {
+ rp *storage.ReplicaPlacement
+ ttl *storage.TTL
+ vid2location map[storage.VolumeId]*VolumeLocationList
+ writables []storage.VolumeId // transient array of writable volume id
+ volumeSizeLimit uint64
+ accessLock sync.RWMutex
+}
+
+func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
+ return &VolumeLayout{
+ rp: rp,
+ ttl: ttl,
+ vid2location: make(map[storage.VolumeId]*VolumeLocationList),
+ writables: *new([]storage.VolumeId),
+ volumeSizeLimit: volumeSizeLimit,
+ }
+}
+
+func (vl *VolumeLayout) String() string {
+ return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
+}
+
+func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ if _, ok := vl.vid2location[v.Id]; !ok {
+ vl.vid2location[v.Id] = NewVolumeLocationList()
+ }
+ vl.vid2location[v.Id].Set(dn)
+ glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
+ if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ vl.addToWritable(v.Id)
+ } else {
+ vl.removeFromWritable(v.Id)
+ }
+}
+
+func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.removeFromWritable(v.Id)
+ delete(vl.vid2location, v.Id)
+}
+
+func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
+ for _, id := range vl.writables {
+ if vid == id {
+ return
+ }
+ }
+ vl.writables = append(vl.writables, vid)
+}
+
+func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
+ return uint64(v.Size) < vl.volumeSizeLimit &&
+ v.Version == storage.CurrentVersion &&
+ !v.ReadOnly
+}
+
+func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
+ if location := vl.vid2location[vid]; location != nil {
+ return location.list
+ }
+ return nil
+}
+
+func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
+ for _, location := range vl.vid2location {
+ nodes = append(nodes, location.list...)
+ }
+ return
+}
+
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
+ len_writers := len(vl.writables)
+ if len_writers <= 0 {
+ glog.V(0).Infoln("No more writable volumes!")
+ return nil, 0, nil, errors.New("No more writable volumes!")
+ }
+ if option.DataCenter == "" {
+ vid := vl.writables[rand.Intn(len_writers)]
+ locationList := vl.vid2location[vid]
+ if locationList != nil {
+ return &vid, count, locationList, nil
+ }
+ return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+ }
+ var vid storage.VolumeId
+ var locationList *VolumeLocationList
+ counter := 0
+ for _, v := range vl.writables {
+ volumeLocationList := vl.vid2location[v]
+ for _, dn := range volumeLocationList.list {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
+ counter++
+ if rand.Intn(counter) < 1 {
+ vid, locationList = v, volumeLocationList
+ }
+ }
+ }
+ }
+ return &vid, count, locationList, nil
+}
+
+func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
+ if option.DataCenter == "" {
+ return len(vl.writables)
+ }
+ counter := 0
+ for _, v := range vl.writables {
+ for _, dn := range vl.vid2location[v].list {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
+ counter++
+ }
+ }
+ }
+ return counter
+}
+
+func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
+ toDeleteIndex := -1
+ for k, id := range vl.writables {
+ if id == vid {
+ toDeleteIndex = k
+ break
+ }
+ }
+ if toDeleteIndex >= 0 {
+ glog.V(0).Infoln("Volume", vid, "becomes unwritable")
+ vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...)
+ return true
+ }
+ return false
+}
+func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
+ for _, v := range vl.writables {
+ if v == vid {
+ return false
+ }
+ }
+ glog.V(0).Infoln("Volume", vid, "becomes writable")
+ vl.writables = append(vl.writables, vid)
+ return true
+}
+
+func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ if location, ok := vl.vid2location[vid]; ok {
+ if location.Remove(dn) {
+ if location.Length() < vl.rp.GetCopyCount() {
+ glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
+ return vl.removeFromWritable(vid)
+ }
+ }
+ }
+ return false
+}
+func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ vl.vid2location[vid].Set(dn)
+ if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+ return vl.setVolumeWritable(vid)
+ }
+ return false
+}
+
+func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ // glog.V(0).Infoln("Volume", vid, "reaches full capacity.")
+ return vl.removeFromWritable(vid)
+}
+
+func (vl *VolumeLayout) ToMap() map[string]interface{} {
+ m := make(map[string]interface{})
+ m["replication"] = vl.rp.String()
+ m["ttl"] = vl.ttl.String()
+ m["writables"] = vl.writables
+ //m["locations"] = vl.vid2location
+ return m
+}
diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go
new file mode 100644
index 000000000..d5eaf5e92
--- /dev/null
+++ b/weed/topology/volume_location_list.go
@@ -0,0 +1,65 @@
+package topology
+
+import (
+ "fmt"
+)
+
+type VolumeLocationList struct {
+ list []*DataNode
+}
+
+func NewVolumeLocationList() *VolumeLocationList {
+ return &VolumeLocationList{}
+}
+
+func (dnll *VolumeLocationList) String() string {
+ return fmt.Sprintf("%v", dnll.list)
+}
+
+func (dnll *VolumeLocationList) Head() *DataNode {
+ //mark first node as master volume
+ return dnll.list[0]
+}
+
+func (dnll *VolumeLocationList) Length() int {
+ return len(dnll.list)
+}
+
+func (dnll *VolumeLocationList) Set(loc *DataNode) {
+ for i := 0; i < len(dnll.list); i++ {
+ if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
+ dnll.list[i] = loc
+ return
+ }
+ }
+ dnll.list = append(dnll.list, loc)
+}
+
+func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
+ for i, dnl := range dnll.list {
+ if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
+ dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
+ return true
+ }
+ }
+ return false
+}
+
+func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
+ var changed bool
+ for _, dnl := range dnll.list {
+ if dnl.LastSeen < freshThreshHold {
+ changed = true
+ break
+ }
+ }
+ if changed {
+ var l []*DataNode
+ for _, dnl := range dnll.list {
+ if dnl.LastSeen >= freshThreshHold {
+ l = append(l, dnl)
+ }
+ }
+ dnll.list = l
+ }
+}