aboutsummaryrefslogtreecommitdiff
path: root/src/weed/topology/topology.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-10 03:09:26 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-10 03:09:26 -0800
commitcb4e8ec16b5c204718f1efdc1731f1bdd5698ff3 (patch)
tree47f27040c3d80e6849932bf2acf54b68c930f72b /src/weed/topology/topology.go
parentd3b267bac27018b7f70dfec7c258d0556fff4c14 (diff)
downloadseaweedfs-cb4e8ec16b5c204718f1efdc1731f1bdd5698ff3.tar.xz
seaweedfs-cb4e8ec16b5c204718f1efdc1731f1bdd5698ff3.zip
re-organize code directory structure
Diffstat (limited to 'src/weed/topology/topology.go')
-rw-r--r--src/weed/topology/topology.go148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/weed/topology/topology.go b/src/weed/topology/topology.go
new file mode 100644
index 000000000..d0b12def4
--- /dev/null
+++ b/src/weed/topology/topology.go
@@ -0,0 +1,148 @@
+package topology
+
+import (
+ "errors"
+ "io/ioutil"
+ "math/rand"
+ "weed/directory"
+ "weed/sequence"
+ "weed/storage"
+)
+
+type Topology struct {
+ NodeImpl
+
+ //transient vid~servers mapping for each replication type
+ replicaType2VolumeLayout []*VolumeLayout
+
+ pulse int64
+
+ volumeSizeLimit uint64
+
+ sequence sequence.Sequencer
+
+ chanDeadDataNodes chan *DataNode
+ chanRecoveredDataNodes chan *DataNode
+ chanFullVolumes chan storage.VolumeInfo
+
+ configuration *Configuration
+}
+
+func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
+ t := &Topology{}
+ t.id = NodeId(id)
+ t.nodeType = "Topology"
+ t.NodeImpl.value = t
+ t.children = make(map[NodeId]Node)
+ t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ t.pulse = int64(pulse)
+ t.volumeSizeLimit = volumeSizeLimit
+
+ t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
+
+ t.chanDeadDataNodes = make(chan *DataNode)
+ t.chanRecoveredDataNodes = make(chan *DataNode)
+ t.chanFullVolumes = make(chan storage.VolumeInfo)
+
+ t.loadConfiguration(confFile)
+
+ return t
+}
+
+func (t *Topology) loadConfiguration(configurationFile string) error {
+ b, e := ioutil.ReadFile(configurationFile)
+ if e == nil {
+ t.configuration, e = NewConfiguration(b)
+ }
+ return e
+}
+
+func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
+ for _, vl := range t.replicaType2VolumeLayout {
+ if vl != nil {
+ if list := vl.Lookup(vid); list != nil {
+ return list
+ }
+ }
+ }
+ return nil
+}
+
+func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
+ if t.FreeSpace() <= 0 {
+ return false, nil, nil
+ }
+ vid := t.NextVolumeId()
+ ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
+ return ret, node, &vid
+}
+
+func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
+ freeSpace := t.FreeSpace()
+ for _, node := range except {
+ freeSpace -= node.FreeSpace()
+ }
+ if freeSpace <= 0 {
+ return false, nil, nil
+ }
+ vid := t.NextVolumeId()
+ ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
+ return ret, node, &vid
+}
+
+func (t *Topology) NextVolumeId() storage.VolumeId {
+ vid := t.GetMaxVolumeId()
+ return vid.Next()
+}
+
+func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
+ if err != nil {
+ return "", 0, nil, errors.New("No writable volumes avalable!")
+ }
+ fileId, count := t.sequence.NextFileId(count)
+ return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+}
+
+func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
+ replicationTypeIndex := repType.GetReplicationLevelIndex()
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ return t.replicaType2VolumeLayout[replicationTypeIndex]
+}
+
+func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
+ t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
+}
+
+func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
+ dcName, rackName := t.configuration.Locate(ip)
+ dc := t.GetOrCreateDataCenter(dcName)
+ rack := dc.GetOrCreateRack(rackName)
+ dn := rack.FindDataNode(ip, port)
+ if init && dn != nil {
+ t.UnRegisterDataNode(dn)
+ }
+ dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
+ for _, v := range volumeInfos {
+ dn.AddOrUpdateVolume(v)
+ t.RegisterVolumeLayout(&v, dn)
+ }
+}
+
+func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ if string(dc.Id()) == dcName {
+ return dc
+ }
+ }
+ dc := NewDataCenter(dcName)
+ t.LinkChildNode(dc)
+ return dc
+}