diff options
Diffstat (limited to 'weed/topology')
| -rw-r--r-- | weed/topology/allocate_volume.go | 35 | ||||
| -rw-r--r-- | weed/topology/cluster_commands.go | 31 | ||||
| -rw-r--r-- | weed/topology/collection.go | 57 | ||||
| -rw-r--r-- | weed/topology/configuration.go | 65 | ||||
| -rw-r--r-- | weed/topology/configuration_test.go | 42 | ||||
| -rw-r--r-- | weed/topology/data_center.go | 40 | ||||
| -rw-r--r-- | weed/topology/data_node.go | 115 | ||||
| -rw-r--r-- | weed/topology/node.go | 272 | ||||
| -rw-r--r-- | weed/topology/rack.go | 65 | ||||
| -rw-r--r-- | weed/topology/store_replicate.go | 150 | ||||
| -rw-r--r-- | weed/topology/topo_test.go | 17 | ||||
| -rw-r--r-- | weed/topology/topology.go | 189 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 74 | ||||
| -rw-r--r-- | weed/topology/topology_map.go | 53 | ||||
| -rw-r--r-- | weed/topology/topology_vacuum.go | 158 | ||||
| -rw-r--r-- | weed/topology/volume_growth.go | 211 | ||||
| -rw-r--r-- | weed/topology/volume_growth_test.go | 135 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 226 | ||||
| -rw-r--r-- | weed/topology/volume_location_list.go | 65 |
19 files changed, 2000 insertions, 0 deletions
diff --git a/weed/topology/allocate_volume.go b/weed/topology/allocate_volume.go new file mode 100644 index 000000000..7b267a805 --- /dev/null +++ b/weed/topology/allocate_volume.go @@ -0,0 +1,35 @@ +package topology + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type AllocateVolumeResult struct { + Error string +} + +func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("collection", option.Collection) + values.Add("replication", option.ReplicaPlacement.String()) + values.Add("ttl", option.Ttl.String()) + jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values) + if err != nil { + return err + } + var ret AllocateVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob)) + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go new file mode 100644 index 000000000..53f45ec4d --- /dev/null +++ b/weed/topology/cluster_commands.go @@ -0,0 +1,31 @@ +package topology + +import ( + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type MaxVolumeIdCommand struct { + MaxVolumeId storage.VolumeId `json:"maxVolumeId"` +} + +func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand { + return &MaxVolumeIdCommand{ + MaxVolumeId: value, + } +} + +func (c *MaxVolumeIdCommand) CommandName() string { + return "MaxVolumeId" +} + +func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { + topo := server.Context().(*Topology) + before := topo.GetMaxVolumeId() + topo.UpAdjustMaxVolumeId(c.MaxVolumeId) + + glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) + + return nil, nil +} diff --git a/weed/topology/collection.go b/weed/topology/collection.go new file mode 100644 index 000000000..a17f0c961 --- /dev/null +++ b/weed/topology/collection.go @@ -0,0 +1,57 @@ +package topology + +import ( + "fmt" + + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Collection struct { + Name string + volumeSizeLimit uint64 + storageType2VolumeLayout *util.ConcurrentReadMap +} + +func NewCollection(name string, volumeSizeLimit uint64) *Collection { + c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} + c.storageType2VolumeLayout = util.NewConcurrentReadMap() + return c +} + +func (c *Collection) String() string { + return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) +} + +func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { + keyString := rp.String() + if ttl != nil { + keyString += ttl.String() + } + vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + }) + return vl.(*VolumeLayout) +} + +func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { + for _, vl := range c.storageType2VolumeLayout.Items() { + if vl != nil { + if list := vl.(*VolumeLayout).Lookup(vid); list != nil { + return list + } + } + } + return nil +} + +func (c *Collection) ListVolumeServers() (nodes []*DataNode) { + for _, vl := range c.storageType2VolumeLayout.Items() { + if vl != nil { + if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { + nodes = append(nodes, list...) + } + } + } + return +} diff --git a/weed/topology/configuration.go b/weed/topology/configuration.go new file mode 100644 index 000000000..ffcebb59c --- /dev/null +++ b/weed/topology/configuration.go @@ -0,0 +1,65 @@ +package topology + +import ( + "encoding/xml" +) + +type loc struct { + dcName string + rackName string +} +type rack struct { + Name string `xml:"name,attr"` + Ips []string `xml:"Ip"` +} +type dataCenter struct { + Name string `xml:"name,attr"` + Racks []rack `xml:"Rack"` +} +type topology struct { + DataCenters []dataCenter `xml:"DataCenter"` +} +type Configuration struct { + XMLName xml.Name `xml:"Configuration"` + Topo topology `xml:"Topology"` + ip2location map[string]loc +} + +func NewConfiguration(b []byte) (*Configuration, error) { + c := &Configuration{} + err := xml.Unmarshal(b, c) + c.ip2location = make(map[string]loc) + for _, dc := range c.Topo.DataCenters { + for _, rack := range dc.Racks { + for _, ip := range rack.Ips { + c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name} + } + } + } + return c, err +} + +func (c *Configuration) String() string { + if b, e := xml.MarshalIndent(c, " ", " "); e == nil { + return string(b) + } + return "" +} + +func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) { + if c != nil && c.ip2location != nil { + if loc, ok := c.ip2location[ip]; ok { + return loc.dcName, loc.rackName + } + } + + if dcName == "" { + dcName = "DefaultDataCenter" + } + + if rackName == "" { + rackName = "DefaultRack" + } + + return dcName, rackName +} diff --git a/weed/topology/configuration_test.go b/weed/topology/configuration_test.go new file mode 100644 index 000000000..0a353d16e --- /dev/null +++ b/weed/topology/configuration_test.go @@ -0,0 +1,42 @@ +package topology + +import ( + "fmt" + "testing" +) + +func TestLoadConfiguration(t *testing.T) { + + confContent := ` + +<?xml version="1.0" encoding="UTF-8" ?> +<Configuration> + <Topology> + <DataCenter name="dc1"> + <Rack name="rack1"> + <Ip>192.168.1.1</Ip> + </Rack> + </DataCenter> + <DataCenter name="dc2"> + <Rack name="rack1"> + <Ip>192.168.1.2</Ip> + </Rack> + <Rack name="rack2"> + <Ip>192.168.1.3</Ip> + <Ip>192.168.1.4</Ip> + </Rack> + </DataCenter> + </Topology> +</Configuration> +` + c, err := NewConfiguration([]byte(confContent)) + + fmt.Printf("%s\n", c) + if err != nil { + t.Fatalf("unmarshal error:%v", err) + } + + if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" { + t.Fatalf("unmarshal error:%s", c) + } +} diff --git a/weed/topology/data_center.go b/weed/topology/data_center.go new file mode 100644 index 000000000..bcf2dfd31 --- /dev/null +++ b/weed/topology/data_center.go @@ -0,0 +1,40 @@ +package topology + +type DataCenter struct { + NodeImpl +} + +func NewDataCenter(id string) *DataCenter { + dc := &DataCenter{} + dc.id = NodeId(id) + dc.nodeType = "DataCenter" + dc.children = make(map[NodeId]Node) + dc.NodeImpl.value = dc + return dc +} + +func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack { + for _, c := range dc.Children() { + rack := c.(*Rack) + if string(rack.Id()) == rackName { + return rack + } + } + rack := NewRack(rackName) + dc.LinkChildNode(rack) + return rack +} + +func (dc *DataCenter) ToMap() interface{} { + m := make(map[string]interface{}) + m["Id"] = dc.Id() + m["Max"] = dc.GetMaxVolumeCount() + m["Free"] = dc.FreeSpace() + var racks []interface{} + for _, c := range dc.Children() { + rack := c.(*Rack) + racks = append(racks, rack.ToMap()) + } + m["Racks"] = racks + return m +} diff --git a/weed/topology/data_node.go b/weed/topology/data_node.go new file mode 100644 index 000000000..1404d4aa8 --- /dev/null +++ b/weed/topology/data_node.go @@ -0,0 +1,115 @@ +package topology + +import ( + "fmt" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type DataNode struct { + NodeImpl + volumes map[storage.VolumeId]storage.VolumeInfo + Ip string + Port int + PublicUrl string + LastSeen int64 // unix time in seconds + Dead bool +} + +func NewDataNode(id string) *DataNode { + s := &DataNode{} + s.id = NodeId(id) + s.nodeType = "DataNode" + s.volumes = make(map[storage.VolumeId]storage.VolumeInfo) + s.NodeImpl.value = s + return s +} + +func (dn *DataNode) String() string { + dn.RLock() + defer dn.RUnlock() + return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) +} + +func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { + dn.Lock() + defer dn.Unlock() + if _, ok := dn.volumes[v.Id]; !ok { + dn.volumes[v.Id] = v + dn.UpAdjustVolumeCountDelta(1) + if !v.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(1) + } + dn.UpAdjustMaxVolumeId(v.Id) + } else { + dn.volumes[v.Id] = v + } +} + +func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) { + actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo) + for _, v := range actualVolumes { + actualVolumeMap[v.Id] = v + } + dn.RLock() + for vid, v := range dn.volumes { + if _, ok := actualVolumeMap[vid]; !ok { + glog.V(0).Infoln("Deleting volume id:", vid) + delete(dn.volumes, vid) + deletedVolumes = append(deletedVolumes, v) + dn.UpAdjustVolumeCountDelta(-1) + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } //TODO: adjust max volume id, if need to reclaim volume ids + dn.RUnlock() + for _, v := range actualVolumes { + dn.AddOrUpdateVolume(v) + } + return +} + +func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) { + dn.RLock() + for _, v := range dn.volumes { + ret = append(ret, v) + } + dn.RUnlock() + return ret +} + +func (dn *DataNode) GetDataCenter() *DataCenter { + return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter) +} + +func (dn *DataNode) GetRack() *Rack { + return dn.Parent().(*NodeImpl).value.(*Rack) +} + +func (dn *DataNode) GetTopology() *Topology { + p := dn.Parent() + for p.Parent() != nil { + p = p.Parent() + } + t := p.(*Topology) + return t +} + +func (dn *DataNode) MatchLocation(ip string, port int) bool { + return dn.Ip == ip && dn.Port == port +} + +func (dn *DataNode) Url() string { + return dn.Ip + ":" + strconv.Itoa(dn.Port) +} + +func (dn *DataNode) ToMap() interface{} { + ret := make(map[string]interface{}) + ret["Url"] = dn.Url() + ret["Volumes"] = dn.GetVolumeCount() + ret["Max"] = dn.GetMaxVolumeCount() + ret["Free"] = dn.FreeSpace() + ret["PublicUrl"] = dn.PublicUrl + return ret +} diff --git a/weed/topology/node.go b/weed/topology/node.go new file mode 100644 index 000000000..4ce35f4b0 --- /dev/null +++ b/weed/topology/node.go @@ -0,0 +1,272 @@ +package topology + +import ( + "errors" + "math/rand" + "strings" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type NodeId string +type Node interface { + Id() NodeId + String() string + FreeSpace() int + ReserveOneVolume(r int) (*DataNode, error) + UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) + UpAdjustVolumeCountDelta(volumeCountDelta int) + UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) + UpAdjustMaxVolumeId(vid storage.VolumeId) + + GetVolumeCount() int + GetActiveVolumeCount() int + GetMaxVolumeCount() int + GetMaxVolumeId() storage.VolumeId + SetParent(Node) + LinkChildNode(node Node) + UnlinkChildNode(nodeId NodeId) + CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) + + IsDataNode() bool + IsRack() bool + IsDataCenter() bool + Children() []Node + Parent() Node + + GetValue() interface{} //get reference to the topology,dc,rack,datanode +} +type NodeImpl struct { + id NodeId + volumeCount int + activeVolumeCount int + maxVolumeCount int + parent Node + sync.RWMutex // lock children + children map[NodeId]Node + maxVolumeId storage.VolumeId + + //for rack, data center, topology + nodeType string + value interface{} +} + +// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot +func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) { + candidates := make([]Node, 0, len(n.children)) + var errs []string + n.RLock() + for _, node := range n.children { + if err := filterFirstNodeFn(node); err == nil { + candidates = append(candidates, node) + } else { + errs = append(errs, string(node.Id())+":"+err.Error()) + } + } + n.RUnlock() + if len(candidates) == 0 { + return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n")) + } + firstNode = candidates[rand.Intn(len(candidates))] + glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id()) + + restNodes = make([]Node, numberOfNodes-1) + candidates = candidates[:0] + n.RLock() + for _, node := range n.children { + if node.Id() == firstNode.Id() { + continue + } + if node.FreeSpace() <= 0 { + continue + } + glog.V(2).Infoln("select rest node candidate:", node.Id()) + candidates = append(candidates, node) + } + n.RUnlock() + glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates") + ret := len(restNodes) == 0 + for k, node := range candidates { + if k < len(restNodes) { + restNodes[k] = node + if k == len(restNodes)-1 { + ret = true + } + } else { + r := rand.Intn(k + 1) + if r < len(restNodes) { + restNodes[r] = node + } + } + } + if !ret { + glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates") + err = errors.New("Not enough data node found!") + } + return +} + +func (n *NodeImpl) IsDataNode() bool { + return n.nodeType == "DataNode" +} +func (n *NodeImpl) IsRack() bool { + return n.nodeType == "Rack" +} +func (n *NodeImpl) IsDataCenter() bool { + return n.nodeType == "DataCenter" +} +func (n *NodeImpl) String() string { + if n.parent != nil { + return n.parent.String() + ":" + string(n.id) + } + return string(n.id) +} +func (n *NodeImpl) Id() NodeId { + return n.id +} +func (n *NodeImpl) FreeSpace() int { + return n.maxVolumeCount - n.volumeCount +} +func (n *NodeImpl) SetParent(node Node) { + n.parent = node +} +func (n *NodeImpl) Children() (ret []Node) { + n.RLock() + defer n.RUnlock() + for _, c := range n.children { + ret = append(ret, c) + } + return ret +} +func (n *NodeImpl) Parent() Node { + return n.parent +} +func (n *NodeImpl) GetValue() interface{} { + return n.value +} +func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) { + n.RLock() + defer n.RUnlock() + for _, node := range n.children { + freeSpace := node.FreeSpace() + // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace) + if freeSpace <= 0 { + continue + } + if r >= freeSpace { + r -= freeSpace + } else { + if node.IsDataNode() && node.FreeSpace() > 0 { + // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace()) + return node.(*DataNode), nil + } + assignedNode, err = node.ReserveOneVolume(r) + if err != nil { + return + } + } + } + return +} + +func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative + n.maxVolumeCount += maxVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative + n.volumeCount += volumeCountDelta + if n.parent != nil { + n.parent.UpAdjustVolumeCountDelta(volumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative + n.activeVolumeCount += activeVolumeCountDelta + if n.parent != nil { + n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta) + } +} +func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative + if n.maxVolumeId < vid { + n.maxVolumeId = vid + if n.parent != nil { + n.parent.UpAdjustMaxVolumeId(vid) + } + } +} +func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId { + return n.maxVolumeId +} +func (n *NodeImpl) GetVolumeCount() int { + return n.volumeCount +} +func (n *NodeImpl) GetActiveVolumeCount() int { + return n.activeVolumeCount +} +func (n *NodeImpl) GetMaxVolumeCount() int { + return n.maxVolumeCount +} + +func (n *NodeImpl) LinkChildNode(node Node) { + n.Lock() + defer n.Unlock() + if n.children[node.Id()] == nil { + n.children[node.Id()] = node + n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount()) + n.UpAdjustMaxVolumeId(node.GetMaxVolumeId()) + n.UpAdjustVolumeCountDelta(node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount()) + node.SetParent(n) + glog.V(0).Infoln(n, "adds child", node.Id()) + } +} + +func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) { + n.Lock() + defer n.Unlock() + node := n.children[nodeId] + if node != nil { + node.SetParent(nil) + delete(n.children, node.Id()) + n.UpAdjustVolumeCountDelta(-node.GetVolumeCount()) + n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount()) + n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount()) + glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount) + } +} + +func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) { + if n.IsRack() { + for _, c := range n.Children() { + dn := c.(*DataNode) //can not cast n to DataNode + if dn.LastSeen < freshThreshHold { + if !dn.Dead { + dn.Dead = true + n.GetTopology().chanDeadDataNodes <- dn + } + } + for _, v := range dn.GetVolumes() { + if uint64(v.Size) >= volumeSizeLimit { + //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) + n.GetTopology().chanFullVolumes <- v + } + } + } + } else { + for _, c := range n.Children() { + c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit) + } + } +} + +func (n *NodeImpl) GetTopology() *Topology { + var p Node + p = n + for p.Parent() != nil { + p = p.Parent() + } + return p.GetValue().(*Topology) +} diff --git a/weed/topology/rack.go b/weed/topology/rack.go new file mode 100644 index 000000000..1ca2f8de8 --- /dev/null +++ b/weed/topology/rack.go @@ -0,0 +1,65 @@ +package topology + +import ( + "strconv" + "time" +) + +type Rack struct { + NodeImpl +} + +func NewRack(id string) *Rack { + r := &Rack{} + r.id = NodeId(id) + r.nodeType = "Rack" + r.children = make(map[NodeId]Node) + r.NodeImpl.value = r + return r +} + +func (r *Rack) FindDataNode(ip string, port int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + return dn + } + } + return nil +} +func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode { + for _, c := range r.Children() { + dn := c.(*DataNode) + if dn.MatchLocation(ip, port) { + dn.LastSeen = time.Now().Unix() + if dn.Dead { + dn.Dead = false + r.GetTopology().chanRecoveredDataNodes <- dn + dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount) + } + return dn + } + } + dn := NewDataNode(ip + ":" + strconv.Itoa(port)) + dn.Ip = ip + dn.Port = port + dn.PublicUrl = publicUrl + dn.maxVolumeCount = maxVolumeCount + dn.LastSeen = time.Now().Unix() + r.LinkChildNode(dn) + return dn +} + +func (r *Rack) ToMap() interface{} { + m := make(map[string]interface{}) + m["Id"] = r.Id() + m["Max"] = r.GetMaxVolumeCount() + m["Free"] = r.FreeSpace() + var dns []interface{} + for _, c := range r.Children() { + dn := c.(*DataNode) + dns = append(dns, dn.ToMap()) + } + m["DataNodes"] = dns + return m +} diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go new file mode 100644 index 000000000..be5777167 --- /dev/null +++ b/weed/topology/store_replicate.go @@ -0,0 +1,150 @@ +package topology + +import ( + "bytes" + "errors" + "fmt" + "net/http" + "strconv" + "strings" + + "net/url" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func ReplicatedWrite(masterNode string, s *storage.Store, + volumeId storage.VolumeId, needle *storage.Needle, + r *http.Request) (size uint32, errorStatus string) { + + //check JWT + jwt := security.GetJwt(r) + + ret, err := s.Write(volumeId, needle) + needToReplicate := !s.HasVolume(volumeId) + if err != nil { + errorStatus = "Failed to write to local disk (" + err.Error() + ")" + } else if ret > 0 { + needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() + } else { + errorStatus = "Failed to write to local disk" + } + if !needToReplicate && ret > 0 { + needToReplicate = s.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "replicate" { + + if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error { + u := url.URL{ + Scheme: "http", + Host: location.Url, + Path: r.URL.Path, + } + q := url.Values{ + "type": {"replicate"}, + } + if needle.LastModified > 0 { + q.Set("ts", strconv.FormatUint(needle.LastModified, 10)) + } + if needle.IsChunkedManifest() { + q.Set("cm", "true") + } + u.RawQuery = q.Encode() + _, err := operation.Upload(u.String(), + string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime), + jwt) + return err + }); err != nil { + ret = 0 + errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err) + } + } + } + size = ret + return +} + +func ReplicatedDelete(masterNode string, store *storage.Store, + volumeId storage.VolumeId, n *storage.Needle, + r *http.Request) (uint32, error) { + + //check JWT + jwt := security.GetJwt(r) + + ret, err := store.Delete(volumeId, n) + if err != nil { + glog.V(0).Infoln("delete error:", err) + return ret, err + } + + needToReplicate := !store.HasVolume(volumeId) + if !needToReplicate && ret > 0 { + needToReplicate = store.GetVolume(volumeId).NeedToReplicate() + } + if needToReplicate { //send to other replica locations + if r.FormValue("type") != "replicate" { + if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error { + return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt) + }); err != nil { + ret = 0 + } + } + } + return ret, err +} + +type DistributedOperationResult map[string]error + +func (dr DistributedOperationResult) Error() error { + var errs []string + for k, v := range dr { + if v != nil { + errs = append(errs, fmt.Sprintf("[%s]: %v", k, v)) + } + } + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, "\n")) +} + +type RemoteResult struct { + Host string + Error error +} + +func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error { + if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { + length := 0 + selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) + results := make(chan RemoteResult) + for _, location := range lookupResult.Locations { + if location.Url != selfUrl { + length++ + go func(location operation.Location, results chan RemoteResult) { + results <- RemoteResult{location.Url, op(location)} + }(location, results) + } + } + ret := DistributedOperationResult(make(map[string]error)) + for i := 0; i < length; i++ { + result := <-results + ret[result.Host] = result.Error + } + if volume := store.GetVolume(volumeId); volume != nil { + if length+1 < volume.ReplicaPlacement.GetCopyCount() { + return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount()) + } + } + return ret.Error() + } else { + glog.V(0).Infoln() + return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr) + } + return nil +} diff --git a/weed/topology/topo_test.go b/weed/topology/topo_test.go new file mode 100644 index 000000000..9a0dbc6b8 --- /dev/null +++ b/weed/topology/topo_test.go @@ -0,0 +1,17 @@ +package topology + +import ( + "testing" +) + +func TestRemoveDataCenter(t *testing.T) { + topo := setup(topologyLayout) + topo.UnlinkChildNode(NodeId("dc2")) + if topo.GetActiveVolumeCount() != 15 { + t.Fail() + } + topo.UnlinkChildNode(NodeId("dc3")) + if topo.GetActiveVolumeCount() != 12 { + t.Fail() + } +} diff --git a/weed/topology/topology.go b/weed/topology/topology.go new file mode 100644 index 000000000..04b500053 --- /dev/null +++ b/weed/topology/topology.go @@ -0,0 +1,189 @@ +package topology + +import ( + "errors" + "io/ioutil" + "math/rand" + + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Topology struct { + NodeImpl + + collectionMap *util.ConcurrentReadMap + + pulse int64 + + volumeSizeLimit uint64 + + Sequence sequence.Sequencer + + chanDeadDataNodes chan *DataNode + chanRecoveredDataNodes chan *DataNode + chanFullVolumes chan storage.VolumeInfo + + configuration *Configuration + + RaftServer raft.Server +} + +func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) { + t := &Topology{} + t.id = NodeId(id) + t.nodeType = "Topology" + t.NodeImpl.value = t + t.children = make(map[NodeId]Node) + t.collectionMap = util.NewConcurrentReadMap() + t.pulse = int64(pulse) + t.volumeSizeLimit = volumeSizeLimit + + t.Sequence = seq + + t.chanDeadDataNodes = make(chan *DataNode) + t.chanRecoveredDataNodes = make(chan *DataNode) + t.chanFullVolumes = make(chan storage.VolumeInfo) + + err := t.loadConfiguration(confFile) + + return t, err +} + +func (t *Topology) IsLeader() bool { + if leader, e := t.Leader(); e == nil { + return leader == t.RaftServer.Name() + } + return false +} + +func (t *Topology) Leader() (string, error) { + l := "" + if t.RaftServer != nil { + l = t.RaftServer.Leader() + } else { + return "", errors.New("Raft Server not ready yet!") + } + + if l == "" { + // We are a single node cluster, we are the leader + return t.RaftServer.Name(), errors.New("Raft Server not initialized!") + } + + return l, nil +} + +func (t *Topology) loadConfiguration(configurationFile string) error { + b, e := ioutil.ReadFile(configurationFile) + if e == nil { + t.configuration, e = NewConfiguration(b) + return e + } + glog.V(0).Infoln("Using default configurations.") + return nil +} + +func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { + //maybe an issue if lots of collections? + if collection == "" { + for _, c := range t.collectionMap.Items() { + if list := c.(*Collection).Lookup(vid); list != nil { + return list + } + } + } else { + if c, ok := t.collectionMap.Find(collection); ok { + return c.(*Collection).Lookup(vid) + } + } + return nil +} + +func (t *Topology) NextVolumeId() storage.VolumeId { + vid := t.GetMaxVolumeId() + next := vid.Next() + go t.RaftServer.Do(NewMaxVolumeIdCommand(next)) + return next +} + +func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { + vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) { + vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option) + if err != nil || datanodes.Length() == 0 { + return "", 0, nil, errors.New("No writable volumes available!") + } + fileId, count := t.Sequence.NextFileId(count) + return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil +} + +func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { + return t.collectionMap.Get(collectionName, func() interface{} { + return NewCollection(collectionName, t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) +} + +func (t *Topology) FindCollection(collectionName string) (*Collection, bool) { + c, hasCollection := t.collectionMap.Find(collectionName) + return c.(*Collection), hasCollection +} + +func (t *Topology) DeleteCollection(collectionName string) { + t.collectionMap.Delete(collectionName) +} + +func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn) +} +func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { + glog.Infof("removing volume info:%+v", v) + t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn) +} + +func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) { + t.Sequence.SetMax(*joinMessage.MaxFileKey) + dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack) + dc := t.GetOrCreateDataCenter(dcName) + rack := dc.GetOrCreateRack(rackName) + dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port)) + if *joinMessage.IsInit && dn != nil { + t.UnRegisterDataNode(dn) + } + dn = rack.GetOrCreateDataNode(*joinMessage.Ip, + int(*joinMessage.Port), *joinMessage.PublicUrl, + int(*joinMessage.MaxVolumeCount)) + var volumeInfos []storage.VolumeInfo + for _, v := range joinMessage.Volumes { + if vi, err := storage.NewVolumeInfo(v); err == nil { + volumeInfos = append(volumeInfos, vi) + } else { + glog.V(0).Infoln("Fail to convert joined volume information:", err.Error()) + } + } + deletedVolumes := dn.UpdateVolumes(volumeInfos) + for _, v := range volumeInfos { + t.RegisterVolumeLayout(v, dn) + } + for _, v := range deletedVolumes { + t.UnRegisterVolumeLayout(v, dn) + } +} + +func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter { + for _, c := range t.Children() { + dc := c.(*DataCenter) + if string(dc.Id()) == dcName { + return dc + } + } + dc := NewDataCenter(dcName) + t.LinkChildNode(dc) + return dc +} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go new file mode 100644 index 000000000..737b94482 --- /dev/null +++ b/weed/topology/topology_event_handling.go @@ -0,0 +1,74 @@ +package topology + +import ( + "math/rand" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { + go func() { + for { + if t.IsLeader() { + freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval + t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit) + } + time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + go func(garbageThreshold string) { + c := time.Tick(15 * time.Minute) + for _ = range c { + if t.IsLeader() { + t.Vacuum(garbageThreshold) + } + } + }(garbageThreshold) + go func() { + for { + select { + case v := <-t.chanFullVolumes: + t.SetVolumeCapacityFull(v) + case dn := <-t.chanRecoveredDataNodes: + t.RegisterRecoveredDataNode(dn) + glog.V(0).Infoln("DataNode", dn, "is back alive!") + case dn := <-t.chanDeadDataNodes: + t.UnRegisterDataNode(dn) + glog.V(0).Infoln("DataNode", dn, "is dead!") + } + } + }() +} +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { + vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl) + if !vl.SetVolumeCapacityFull(volumeInfo.Id) { + return false + } + for _, dn := range vl.vid2location[volumeInfo.Id].list { + if !volumeInfo.ReadOnly { + dn.UpAdjustActiveVolumeCountDelta(-1) + } + } + return true +} +func (t *Topology) UnRegisterDataNode(dn *DataNode) { + for _, v := range dn.GetVolumes() { + glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn) + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + vl.SetVolumeUnavailable(dn, v.Id) + } + dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount()) + dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount()) + dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount()) + dn.Parent().UnlinkChildNode(dn.Id()) +} +func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) { + for _, v := range dn.GetVolumes() { + vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl) + if vl.isWritable(&v) { + vl.SetVolumeAvailable(dn, v.Id) + } + } +} diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go new file mode 100644 index 000000000..ce8e9e663 --- /dev/null +++ b/weed/topology/topology_map.go @@ -0,0 +1,53 @@ +package topology + +func (t *Topology) ToMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + var dcs []interface{} + for _, c := range t.Children() { + dc := c.(*DataCenter) + dcs = append(dcs, dc.ToMap()) + } + m["DataCenters"] = dcs + var layouts []interface{} + for _, col := range t.collectionMap.Items() { + c := col.(*Collection) + for _, layout := range c.storageType2VolumeLayout.Items() { + if layout != nil { + tmp := layout.(*VolumeLayout).ToMap() + tmp["collection"] = c.Name + layouts = append(layouts, tmp) + } + } + } + m["layouts"] = layouts + return m +} + +func (t *Topology) ToVolumeMap() interface{} { + m := make(map[string]interface{}) + m["Max"] = t.GetMaxVolumeCount() + m["Free"] = t.FreeSpace() + dcs := make(map[NodeId]interface{}) + for _, c := range t.Children() { + dc := c.(*DataCenter) + racks := make(map[NodeId]interface{}) + for _, r := range dc.Children() { + rack := r.(*Rack) + dataNodes := make(map[NodeId]interface{}) + for _, d := range rack.Children() { + dn := d.(*DataNode) + var volumes []interface{} + for _, v := range dn.GetVolumes() { + volumes = append(volumes, v) + } + dataNodes[d.Id()] = volumes + } + racks[r.Id()] = dataNodes + } + dcs[dc.Id()] = racks + } + m["DataCenters"] = dcs + return m +} diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go new file mode 100644 index 000000000..8cf8dfbeb --- /dev/null +++ b/weed/topology/topology_vacuum.go @@ -0,0 +1,158 @@ +package topology + +import ( + "encoding/json" + "errors" + "net/url" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url()) + if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil { + //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e) + ch <- false + } else { + //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret) + ch <- ret + } + }(index, dn.Url(), vid) + } + isCheckSuccess := true + for _ = range locationlist.list { + select { + case canVacuum := <-ch: + isCheckSuccess = isCheckSuccess && canVacuum + case <-time.After(30 * time.Minute): + isCheckSuccess = false + break + } + } + return isCheckSuccess +} +func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + vl.removeFromWritable(vid) + ch := make(chan bool, locationlist.Length()) + for index, dn := range locationlist.list { + go func(index int, url string, vid storage.VolumeId) { + glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url) + if e := vacuumVolume_Compact(url, vid); e != nil { + glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e) + ch <- false + } else { + glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url) + ch <- true + } + }(index, dn.Url(), vid) + } + isVacuumSuccess := true + for _ = range locationlist.list { + select { + case _ = <-ch: + case <-time.After(30 * time.Minute): + isVacuumSuccess = false + break + } + } + return isVacuumSuccess +} +func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool { + isCommitSuccess := true + for _, dn := range locationlist.list { + glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url()) + if e := vacuumVolume_Commit(dn.Url(), vid); e != nil { + glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e) + isCommitSuccess = false + } else { + glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url()) + } + if isCommitSuccess { + vl.SetVolumeAvailable(dn, vid) + } + } + return isCommitSuccess +} +func (t *Topology) Vacuum(garbageThreshold string) int { + glog.V(0).Infoln("Start vacuum on demand") + for _, col := range t.collectionMap.Items() { + c := col.(*Collection) + glog.V(0).Infoln("vacuum on collection:", c.Name) + for _, vl := range c.storageType2VolumeLayout.Items() { + if vl != nil { + volumeLayout := vl.(*VolumeLayout) + for vid, locationlist := range volumeLayout.vid2location { + glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid) + if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { + batchVacuumVolumeCommit(volumeLayout, vid, locationlist) + } + } + } + } + } + } + return 0 +} + +type VacuumVolumeResult struct { + Result bool + Error string +} + +func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) { + values := make(url.Values) + values.Add("volume", vid.String()) + values.Add("garbageThreshold", garbageThreshold) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values) + if err != nil { + glog.V(0).Infoln("parameters:", values) + return err, false + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err, false + } + if ret.Error != "" { + return errors.New(ret.Error), false + } + return nil, ret.Result +} +func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} +func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error { + values := make(url.Values) + values.Add("volume", vid.String()) + jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values) + if err != nil { + return err + } + var ret VacuumVolumeResult + if err := json.Unmarshal(jsonBlob, &ret); err != nil { + return err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil +} diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go new file mode 100644 index 000000000..3a1c9c567 --- /dev/null +++ b/weed/topology/volume_growth.go @@ -0,0 +1,211 @@ +package topology + +import ( + "fmt" + "math/rand" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +/* +This package is created to resolve these replica placement issues: +1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies +2. in time of tight storage, how to reduce replica level +3. optimizing for hot data on faster disk, cold data on cheaper storage, +4. volume allocation for each bucket +*/ + +type VolumeGrowOption struct { + Collection string + ReplicaPlacement *storage.ReplicaPlacement + Ttl *storage.TTL + DataCenter string + Rack string + DataNode string +} + +type VolumeGrowth struct { + accessLock sync.Mutex +} + +func (o *VolumeGrowOption) String() string { + return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) +} + +func NewDefaultVolumeGrowth() *VolumeGrowth { + return &VolumeGrowth{} +} + +// one replication type may need rp.GetCopyCount() actual volumes +// given copyCount, how many logical volumes to create +func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) { + switch copyCount { + case 1: + count = 7 + case 2: + count = 6 + case 3: + count = 3 + default: + count = 1 + } + return +} + +func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) { + count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo) + if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 { + return count, nil + } + return count, err +} +func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) { + vg.accessLock.Lock() + defer vg.accessLock.Unlock() + + for i := 0; i < targetCount; i++ { + if c, e := vg.findAndGrow(topo, option); e == nil { + counter += c + } else { + return counter, e + } + } + return +} + +func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) { + servers, e := vg.findEmptySlotsForOneVolume(topo, option) + if e != nil { + return 0, e + } + vid := topo.NextVolumeId() + err := vg.grow(topo, vid, option, servers...) + return len(servers), err +} + +// 1. find the main data node +// 1.1 collect all data nodes that have 1 slots +// 2.2 collect all racks that have rp.SameRackCount+1 +// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 +// 2. find rest data nodes +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { + //find main datacenter and other data centers + rp := option.ReplicaPlacement + mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error { + if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { + return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) + } + if len(node.Children()) < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) + } + if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1) + } + possibleRacksCount := 0 + for _, rack := range node.Children() { + possibleDataNodesCount := 0 + for _, n := range rack.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount >= rp.SameRackCount+1 { + possibleRacksCount++ + } + } + if possibleRacksCount < rp.DiffRackCount+1 { + return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1) + } + return nil + }) + if dc_err != nil { + return nil, dc_err + } + + //find main rack and other racks + mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error { + if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { + return fmt.Errorf("Not matching preferred rack:%s", option.Rack) + } + if node.FreeSpace() < rp.SameRackCount+1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1) + } + if len(node.Children()) < rp.SameRackCount+1 { + // a bit faster way to test free racks + return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1) + } + possibleDataNodesCount := 0 + for _, n := range node.Children() { + if n.FreeSpace() >= 1 { + possibleDataNodesCount++ + } + } + if possibleDataNodesCount < rp.SameRackCount+1 { + return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1) + } + return nil + }) + if rack_err != nil { + return nil, rack_err + } + + //find main rack and other racks + mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error { + if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { + return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) + } + if node.FreeSpace() < 1 { + return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1) + } + return nil + }) + if server_err != nil { + return nil, server_err + } + + servers = append(servers, mainServer.(*DataNode)) + for _, server := range otherServers { + servers = append(servers, server.(*DataNode)) + } + for _, rack := range otherRacks { + r := rand.Intn(rack.FreeSpace()) + if server, e := rack.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e + } + } + for _, datacenter := range otherDataCenters { + r := rand.Intn(datacenter.FreeSpace()) + if server, e := datacenter.ReserveOneVolume(r); e == nil { + servers = append(servers, server) + } else { + return servers, e + } + } + return +} + +func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error { + for _, server := range servers { + if err := AllocateVolume(server, vid, option); err == nil { + vi := storage.VolumeInfo{ + Id: vid, + Size: 0, + Collection: option.Collection, + ReplicaPlacement: option.ReplicaPlacement, + Ttl: option.Ttl, + Version: storage.CurrentVersion, + } + server.AddOrUpdateVolume(vi) + topo.RegisterVolumeLayout(vi, server) + glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String()) + } else { + glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err) + return fmt.Errorf("Failed to assign %d: %v", vid, err) + } + } + return nil +} diff --git a/weed/topology/volume_growth_test.go b/weed/topology/volume_growth_test.go new file mode 100644 index 000000000..e5716674a --- /dev/null +++ b/weed/topology/volume_growth_test.go @@ -0,0 +1,135 @@ +package topology + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +var topologyLayout = ` +{ + "dc1":{ + "rack1":{ + "server111":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":2, "size":12312}, + {"id":3, "size":12312} + ], + "limit":3 + }, + "server112":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":10 + } + }, + "rack2":{ + "server121":{ + "volumes":[ + {"id":4, "size":12312}, + {"id":5, "size":12312}, + {"id":6, "size":12312} + ], + "limit":4 + }, + "server122":{ + "volumes":[], + "limit":4 + }, + "server123":{ + "volumes":[ + {"id":2, "size":12312}, + {"id":3, "size":12312}, + {"id":4, "size":12312} + ], + "limit":5 + } + } + }, + "dc2":{ + }, + "dc3":{ + "rack2":{ + "server321":{ + "volumes":[ + {"id":1, "size":12312}, + {"id":3, "size":12312}, + {"id":5, "size":12312} + ], + "limit":4 + } + } + } +} +` + +func setup(topologyLayout string) *Topology { + var data interface{} + err := json.Unmarshal([]byte(topologyLayout), &data) + if err != nil { + fmt.Println("error:", err) + } + fmt.Println("data:", data) + + //need to connect all nodes first before server adding volumes + topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf", + sequence.NewMemorySequencer(), 32*1024, 5) + if err != nil { + panic("error: " + err.Error()) + } + mTopology := data.(map[string]interface{}) + for dcKey, dcValue := range mTopology { + dc := NewDataCenter(dcKey) + dcMap := dcValue.(map[string]interface{}) + topo.LinkChildNode(dc) + for rackKey, rackValue := range dcMap { + rack := NewRack(rackKey) + rackMap := rackValue.(map[string]interface{}) + dc.LinkChildNode(rack) + for serverKey, serverValue := range rackMap { + server := NewDataNode(serverKey) + serverMap := serverValue.(map[string]interface{}) + rack.LinkChildNode(server) + for _, v := range serverMap["volumes"].([]interface{}) { + m := v.(map[string]interface{}) + vi := storage.VolumeInfo{ + Id: storage.VolumeId(int64(m["id"].(float64))), + Size: uint64(m["size"].(float64)), + Version: storage.CurrentVersion} + server.AddOrUpdateVolume(vi) + } + server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64))) + } + } + } + + return topo +} + +func TestFindEmptySlotsForOneVolume(t *testing.T) { + topo := setup(topologyLayout) + vg := NewDefaultVolumeGrowth() + rp, _ := storage.NewReplicaPlacementFromString("002") + volumeGrowOption := &VolumeGrowOption{ + Collection: "", + ReplicaPlacement: rp, + DataCenter: "dc1", + Rack: "", + DataNode: "", + } + servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption) + if err != nil { + fmt.Println("finding empty slots error :", err) + t.Fail() + } + for _, server := range servers { + fmt.Println("assigned node :", server.Id()) + } +} diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go new file mode 100644 index 000000000..e500de583 --- /dev/null +++ b/weed/topology/volume_layout.go @@ -0,0 +1,226 @@ +package topology + +import ( + "errors" + "fmt" + "math/rand" + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +// mapping from volume to its locations, inverted from server to volume +type VolumeLayout struct { + rp *storage.ReplicaPlacement + ttl *storage.TTL + vid2location map[storage.VolumeId]*VolumeLocationList + writables []storage.VolumeId // transient array of writable volume id + volumeSizeLimit uint64 + accessLock sync.RWMutex +} + +func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout { + return &VolumeLayout{ + rp: rp, + ttl: ttl, + vid2location: make(map[storage.VolumeId]*VolumeLocationList), + writables: *new([]storage.VolumeId), + volumeSizeLimit: volumeSizeLimit, + } +} + +func (vl *VolumeLayout) String() string { + return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) +} + +func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if _, ok := vl.vid2location[v.Id]; !ok { + vl.vid2location[v.Id] = NewVolumeLocationList() + } + vl.vid2location[v.Id].Set(dn) + glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount()) + if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + vl.addToWritable(v.Id) + } else { + vl.removeFromWritable(v.Id) + } +} + +func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.removeFromWritable(v.Id) + delete(vl.vid2location, v.Id) +} + +func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) { + for _, id := range vl.writables { + if vid == id { + return + } + } + vl.writables = append(vl.writables, vid) +} + +func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool { + return uint64(v.Size) < vl.volumeSizeLimit && + v.Version == storage.CurrentVersion && + !v.ReadOnly +} + +func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + if location := vl.vid2location[vid]; location != nil { + return location.list + } + return nil +} + +func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + for _, location := range vl.vid2location { + nodes = append(nodes, location.list...) + } + return +} + +func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + len_writers := len(vl.writables) + if len_writers <= 0 { + glog.V(0).Infoln("No more writable volumes!") + return nil, 0, nil, errors.New("No more writable volumes!") + } + if option.DataCenter == "" { + vid := vl.writables[rand.Intn(len_writers)] + locationList := vl.vid2location[vid] + if locationList != nil { + return &vid, count, locationList, nil + } + return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!") + } + var vid storage.VolumeId + var locationList *VolumeLocationList + counter := 0 + for _, v := range vl.writables { + volumeLocationList := vl.vid2location[v] + for _, dn := range volumeLocationList.list { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList + } + } + } + } + return &vid, count, locationList, nil +} + +func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int { + vl.accessLock.RLock() + defer vl.accessLock.RUnlock() + + if option.DataCenter == "" { + return len(vl.writables) + } + counter := 0 + for _, v := range vl.writables { + for _, dn := range vl.vid2location[v].list { + if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + } + } + } + return counter +} + +func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool { + toDeleteIndex := -1 + for k, id := range vl.writables { + if id == vid { + toDeleteIndex = k + break + } + } + if toDeleteIndex >= 0 { + glog.V(0).Infoln("Volume", vid, "becomes unwritable") + vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...) + return true + } + return false +} +func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool { + for _, v := range vl.writables { + if v == vid { + return false + } + } + glog.V(0).Infoln("Volume", vid, "becomes writable") + vl.writables = append(vl.writables, vid) + return true +} + +func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + if location, ok := vl.vid2location[vid]; ok { + if location.Remove(dn) { + if location.Length() < vl.rp.GetCopyCount() { + glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount()) + return vl.removeFromWritable(vid) + } + } + } + return false +} +func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + vl.vid2location[vid].Set(dn) + if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() { + return vl.setVolumeWritable(vid) + } + return false +} + +func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool { + vl.accessLock.Lock() + defer vl.accessLock.Unlock() + + // glog.V(0).Infoln("Volume", vid, "reaches full capacity.") + return vl.removeFromWritable(vid) +} + +func (vl *VolumeLayout) ToMap() map[string]interface{} { + m := make(map[string]interface{}) + m["replication"] = vl.rp.String() + m["ttl"] = vl.ttl.String() + m["writables"] = vl.writables + //m["locations"] = vl.vid2location + return m +} diff --git a/weed/topology/volume_location_list.go b/weed/topology/volume_location_list.go new file mode 100644 index 000000000..d5eaf5e92 --- /dev/null +++ b/weed/topology/volume_location_list.go @@ -0,0 +1,65 @@ +package topology + +import ( + "fmt" +) + +type VolumeLocationList struct { + list []*DataNode +} + +func NewVolumeLocationList() *VolumeLocationList { + return &VolumeLocationList{} +} + +func (dnll *VolumeLocationList) String() string { + return fmt.Sprintf("%v", dnll.list) +} + +func (dnll *VolumeLocationList) Head() *DataNode { + //mark first node as master volume + return dnll.list[0] +} + +func (dnll *VolumeLocationList) Length() int { + return len(dnll.list) +} + +func (dnll *VolumeLocationList) Set(loc *DataNode) { + for i := 0; i < len(dnll.list); i++ { + if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port { + dnll.list[i] = loc + return + } + } + dnll.list = append(dnll.list, loc) +} + +func (dnll *VolumeLocationList) Remove(loc *DataNode) bool { + for i, dnl := range dnll.list { + if loc.Ip == dnl.Ip && loc.Port == dnl.Port { + dnll.list = append(dnll.list[:i], dnll.list[i+1:]...) + return true + } + } + return false +} + +func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) { + var changed bool + for _, dnl := range dnll.list { + if dnl.LastSeen < freshThreshHold { + changed = true + break + } + } + if changed { + var l []*DataNode + for _, dnl := range dnll.list { + if dnl.LastSeen >= freshThreshHold { + l = append(l, dnl) + } + } + dnll.list = l + } +} |
