aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-10 00:18:07 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-10 00:18:07 -0700
commit6daf22193747f6994767facaa05df4701e6de0fb (patch)
tree6aa48590e128766a162880a925974878555ae4bf
parent8684b0999d5d1bfa5f82581130f1ef32a4cb48d5 (diff)
downloadseaweedfs-6daf22193747f6994767facaa05df4701e6de0fb.tar.xz
seaweedfs-6daf22193747f6994767facaa05df4701e6de0fb.zip
volume layout for each replication level
-rw-r--r--weed-fs/note/startup_process.txt7
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go48
-rw-r--r--weed-fs/src/pkg/topology/data_node.go25
-rw-r--r--weed-fs/src/pkg/topology/node.go28
-rw-r--r--weed-fs/src/pkg/topology/topology.go40
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go48
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go21
7 files changed, 189 insertions, 28 deletions
diff --git a/weed-fs/note/startup_process.txt b/weed-fs/note/startup_process.txt
deleted file mode 100644
index 558955005..000000000
--- a/weed-fs/note/startup_process.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-1. clients report its own server info, volumes info,
-2. master collect all volumes, separated into readable volumes, writable volumes, volume2machine mapping
- machines is an array of machine info
- writable volumes is an array of vids
- vid2machineId maps volume id to machineId, which is the index of machines array
-
-
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index cc2cccdca..2d1538bbe 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -1,10 +1,50 @@
package storage
-import (
+import ()
+type VolumeInfo struct {
+ Id VolumeId
+ Size int64
+ ReplicationType ReplicationType
+}
+type ReplicationType int
+
+const (
+ Copy00 = ReplicationType(00) // single copy
+ Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center
+ Copy10 = ReplicationType(10) // 2 copies, each on different data center
+ Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center
+ Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center
+ LengthRelicationType = 5
)
-type VolumeInfo struct {
- Id VolumeId
- Size int64
+func GetReplicationLevelIndex(v *VolumeInfo) int {
+ switch v.ReplicationType {
+ case Copy00:
+ return 0
+ case Copy01:
+ return 1
+ case Copy10:
+ return 2
+ case Copy11:
+ return 3
+ case Copy20:
+ return 4
+ }
+ return -1
+}
+func GetCopyCount(v *VolumeInfo) int {
+ switch v.ReplicationType {
+ case Copy00:
+ return 1
+ case Copy01:
+ return 2
+ case Copy10:
+ return 2
+ case Copy11:
+ return 3
+ case Copy20:
+ return 3
+ }
+ return 0
}
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
index 8085bba29..ea74dd8e0 100644
--- a/weed-fs/src/pkg/topology/data_node.go
+++ b/weed-fs/src/pkg/topology/data_node.go
@@ -8,6 +8,10 @@ import (
type DataNode struct {
NodeImpl
volumes map[storage.VolumeId]*storage.VolumeInfo
+ ip string
+ port int
+ publicUrl string
+ lastSeen int64 // unix time in seconds
}
func NewDataNode(id string) *DataNode {
@@ -17,12 +21,21 @@ func NewDataNode(id string) *DataNode {
s.volumes = make(map[storage.VolumeId]*storage.VolumeInfo)
return s
}
-func (s *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
- s.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
+func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
+ dn.AddVolume(&storage.VolumeInfo{Id: vid, Size: 32 * 1024 * 1024 * 1024})
return vid
}
-func (s *DataNode) AddVolume(v *storage.VolumeInfo) {
- s.volumes[v.Id] = v
- s.UpAdjustActiveVolumeCountDelta(1)
- s.UpAdjustMaxVolumeId(v.Id)
+func (dn *DataNode) AddVolume(v *storage.VolumeInfo) {
+ dn.volumes[v.Id] = v
+ dn.UpAdjustActiveVolumeCountDelta(1)
+ dn.UpAdjustMaxVolumeId(v.Id)
+ dn.GetTopology().RegisterVolume(v,dn)
+}
+func (dn *DataNode) GetTopology() *Topology {
+ p := dn.parent
+ for p.Parent()!=nil{
+ p = p.Parent()
+ }
+ t := p.(*Topology)
+ return t
}
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
index 9815727ff..fb610bd73 100644
--- a/weed-fs/src/pkg/topology/node.go
+++ b/weed-fs/src/pkg/topology/node.go
@@ -20,10 +20,11 @@ type Node interface {
setParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
+ CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId
IsDataNode() bool
Children() map[NodeId]Node
- Parent() Node
+ Parent() Node
}
type NodeImpl struct {
id NodeId
@@ -65,7 +66,7 @@ func (n *NodeImpl) Children() map[NodeId]Node {
return n.children
}
func (n *NodeImpl) Parent() Node {
- return n.parent
+ return n.parent
}
func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId) (bool, *DataNode) {
ret := false
@@ -146,3 +147,26 @@ func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
fmt.Println(n, "removes", node, "volumeCount =", n.activeVolumeCount)
}
}
+
+func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit uint64) []storage.VolumeId {
+ var ret []storage.VolumeId
+ if n.IsRack() {
+ for _, c := range n.Children() {
+ dn := c.(*DataNode) //can not cast n to DataNode
+ if dn.lastSeen > freshThreshHold {
+ continue
+ }
+ for _, v := range dn.volumes {
+ if uint64(v.Size) < volumeSizeLimit {
+ ret = append(ret, v.Id)
+ }
+ }
+ }
+ } else {
+ for _, c := range n.Children() {
+ ret = append(ret, c.CollectWritableVolumes(freshThreshHold, volumeSizeLimit)...)
+ }
+ }
+
+ return ret
+}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index 9e5f7bab4..2e617c927 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -3,37 +3,59 @@ package topology
import (
_ "fmt"
"math/rand"
+ "pkg/sequence"
"pkg/storage"
)
type Topology struct {
NodeImpl
+
+ //transient vid~servers mapping for each replication type
+ replicaType2VolumeLayout []*VolumeLayout
+
+ pulse int64
+
+ volumeSizeLimit uint64
+
+ sequence sequence.Sequencer
}
-func NewTopology(id string) *Topology {
+func NewTopology(id string, dirname string, filename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.children = make(map[NodeId]Node)
+ t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ t.pulse = int64(pulse)
+ t.volumeSizeLimit = volumeSizeLimit
+ t.sequence = sequence.NewSequencer(dirname, filename)
return t
}
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, storage.VolumeId) {
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
- return ret, node, vid
+ return ret, node, vid
}
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, storage.VolumeId) {
- freeSpace := t.FreeSpace()
- for _, node := range except {
- freeSpace -= node.FreeSpace()
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
- return ret, node, vid
+ freeSpace := t.FreeSpace()
+ for _, node := range except {
+ freeSpace -= node.FreeSpace()
+ }
+ vid := t.NextVolumeId()
+ ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
+ return ret, node, vid
}
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
return vid.Next()
}
+
+func (t *Topology) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ replicationTypeIndex := storage.GetReplicationLevelIndex(v)
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse)
+ }
+ t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn)
+}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
new file mode 100644
index 000000000..f11ea430a
--- /dev/null
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -0,0 +1,48 @@
+package topology
+
+import (
+ "errors"
+ "fmt"
+ "math/rand"
+ "pkg/storage"
+)
+
+type VolumeLayout struct {
+ vid2location map[storage.VolumeId]*DataNodeLocationList
+ writables []storage.VolumeId // transient array of writable volume id
+ pulse int64
+ volumeSizeLimit uint64
+}
+
+func NewVolumeLayout(volumeSizeLimit uint64, pulse int64) *VolumeLayout {
+ return &VolumeLayout{
+ vid2location: make(map[storage.VolumeId]*DataNodeLocationList),
+ writables: *new([]storage.VolumeId),
+ pulse: pulse,
+ volumeSizeLimit: volumeSizeLimit,
+ }
+}
+
+func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ if _, ok := vl.vid2location[v.Id]; !ok {
+ vl.vid2location[v.Id] = NewDataNodeLocationList()
+ }
+ vl.vid2location[v.Id].Add(dn)
+ if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) {
+ vl.writables = append(vl.writables,v.Id)
+ }
+}
+
+func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) {
+ len_writers := len(vl.writables)
+ if len_writers <= 0 {
+ fmt.Println("No more writable volumes!")
+ return 0, nil, errors.New("No more writable volumes!")
+ }
+ vid := vl.writables[rand.Intn(len_writers)]
+ locationList := vl.vid2location[vid]
+ if locationList != nil {
+ return count, locationList, nil
+ }
+ return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
new file mode 100644
index 000000000..c61ee119b
--- /dev/null
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -0,0 +1,21 @@
+package topology
+
+import (
+)
+
+type DataNodeLocationList struct {
+ list []*DataNode
+}
+
+func NewDataNodeLocationList() *DataNodeLocationList {
+ return &DataNodeLocationList{}
+}
+
+func (dnll *DataNodeLocationList) Add(loc *DataNode){
+ for _, dnl := range dnll.list {
+ if loc.ip == dnl.ip && loc.port == dnl.port {
+ break
+ }
+ }
+ dnll.list = append(dnll.list, loc)
+}