1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
|
package shell
import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"io"
"math/rand"
"sort"
)
func init() {
commands = append(commands, &commandVolumeFixReplication{})
}
type commandVolumeFixReplication struct {
}
func (c *commandVolumeFixReplication) Name() string {
return "volume.fix.replication"
}
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
-n do not take action
`
}
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
takeAction := true
if len(args) > 0 && args[0] == "-n" {
takeAction = false
}
var resp *master_pb.VolumeListResponse
ctx := context.Background()
err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return err
}
// find all volumes that needs replication
// collect all data nodes
replicatedVolumeLocations := make(map[uint32][]location)
replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
var allLocations []location
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dn := range rack.DataNodeInfos {
loc := newLocation(dc.Id, rack.Id, dn)
for _, v := range dn.VolumeInfos {
if v.ReplicaPlacement > 0 {
replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
replicatedVolumeInfo[v.Id] = v
}
}
allLocations = append(allLocations, loc)
}
}
}
// find all under replicated volumes
underReplicatedVolumeLocations := make(map[uint32][]location)
for vid, locations := range replicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid]
replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(locations) {
underReplicatedVolumeLocations[vid] = locations
}
}
if len(underReplicatedVolumeLocations) == 0 {
return fmt.Errorf("no under replicated volumes")
}
if len(allLocations) == 0 {
return fmt.Errorf("no data nodes at all")
}
// find the most under populated data nodes
keepDataNodesSorted(allLocations)
for vid, locations := range underReplicatedVolumeLocations {
volumeInfo := replicatedVolumeInfo[vid]
replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
// ask the volume server to replicate the volume
sourceNodes := underReplicatedVolumeLocations[vid]
sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break
}
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{
VolumeId: volumeInfo.Id,
Collection: volumeInfo.Collection,
SourceDataNode: sourceNode.dataNode.Id,
})
return replicateErr
})
if err != nil {
return err
}
// adjust free volume count
dst.dataNode.FreeVolumeCount--
keepDataNodesSorted(allLocations)
break
}
}
if !foundNewLocation {
fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
}
}
return nil
}
func keepDataNodesSorted(dataNodes []location) {
sort.Slice(dataNodes, func(i, j int) bool {
return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
})
}
func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
existingDataCenters := make(map[string]bool)
existingRacks := make(map[string]bool)
existingDataNodes := make(map[string]bool)
for _, loc := range existingLocations {
existingDataCenters[loc.DataCenter()] = true
existingRacks[loc.Rack()] = true
existingDataNodes[loc.String()] = true
}
if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
// check dc, good if different from any existing data centers
_, found := existingDataCenters[possibleLocation.DataCenter()]
return !found
} else if replicaPlacement.DiffRackCount >= len(existingRacks) {
// check rack, good if different from any existing racks
_, found := existingRacks[possibleLocation.Rack()]
return !found
} else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
// check data node, good if different from any existing data nodes
_, found := existingDataNodes[possibleLocation.String()]
return !found
}
return false
}
type location struct {
dc string
rack string
dataNode *master_pb.DataNodeInfo
}
func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
return location{
dc: dc,
rack: rack,
dataNode: dataNode,
}
}
func (l location) String() string {
return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
}
func (l location) Rack() string {
return fmt.Sprintf("%s %s", l.dc, l.rack)
}
func (l location) DataCenter() string {
return l.dc
}
|