1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
package topology
import (
"errors"
"io/ioutil"
"math/rand"
"code.google.com/p/weed-fs/go/directory"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
)
type Topology struct {
NodeImpl
//transient vid~servers mapping for each replication type
replicaType2VolumeLayout []*VolumeLayout
pulse int64
volumeSizeLimit uint64
sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
}
func NewTopology(id string, confFile string, dirname string, sequenceFilename string, volumeSizeLimit uint64, pulse int) *Topology {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.sequence = sequence.NewSequencer(dirname, sequenceFilename)
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
t.loadConfiguration(confFile)
return t
}
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
t.configuration, e = NewConfiguration(b)
}
return e
}
func (t *Topology) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range t.replicaType2VolumeLayout {
if vl != nil {
if list := vl.Lookup(vid); list != nil {
return list
}
}
}
return nil
}
func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
if t.FreeSpace() <= 0 {
return false, nil, nil
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
return ret, node, &vid
}
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, *storage.VolumeId) {
freeSpace := t.FreeSpace()
for _, node := range except {
freeSpace -= node.FreeSpace()
}
if freeSpace <= 0 {
return false, nil, nil
}
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(freeSpace), vid)
return ret, node, &vid
}
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
return vid.Next()
}
func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
if err != nil {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
fileId, count := t.sequence.NextFileId(count)
return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
replicationTypeIndex := repType.GetReplicationLevelIndex()
if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
}
return t.replicaType2VolumeLayout[replicationTypeIndex]
}
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
dcName, rackName := t.configuration.Locate(ip)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(ip, port)
if init && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
for _, v := range volumeInfos {
dn.AddOrUpdateVolume(v)
t.RegisterVolumeLayout(&v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
if string(dc.Id()) == dcName {
return dc
}
}
dc := NewDataCenter(dcName)
t.LinkChildNode(dc)
return dc
}
|