1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
|
package topology
import (
"encoding/json"
"fmt"
"math/rand/v2"
"reflect"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/server/constants"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
/*
This package is created to resolve these replica placement issues:
1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
2. in time of tight storage, how to reduce replica level
3. optimizing for hot data on faster disk, cold data on cheaper storage,
4. volume allocation for each bucket
*/
type VolumeGrowRequest struct {
Option *VolumeGrowOption
Count uint32
Force bool
Reason string
}
func (vg *VolumeGrowRequest) Equals(req *VolumeGrowRequest) bool {
return reflect.DeepEqual(vg.Option, req.Option) && vg.Count == req.Count && vg.Force == req.Force
}
type volumeGrowthStrategy struct {
Copy1Count uint32
Copy2Count uint32
Copy3Count uint32
CopyOtherCount uint32
Threshold float64
}
var (
VolumeGrowStrategy = volumeGrowthStrategy{
Copy1Count: 7,
Copy2Count: 6,
Copy3Count: 3,
CopyOtherCount: 1,
Threshold: 0.9,
}
)
type VolumeGrowOption struct {
Collection string `json:"collection,omitempty"`
ReplicaPlacement *super_block.ReplicaPlacement `json:"replication,omitempty"`
Ttl *needle.TTL `json:"ttl,omitempty"`
DiskType types.DiskType `json:"disk,omitempty"`
Preallocate int64 `json:"preallocate,omitempty"`
DataCenter string `json:"dataCenter,omitempty"`
Rack string `json:"rack,omitempty"`
DataNode string `json:"dataNode,omitempty"`
MemoryMapMaxSizeMb uint32 `json:"memoryMapMaxSizeMb,omitempty"`
Version uint32 `json:"version,omitempty"`
}
type VolumeGrowth struct {
accessLock sync.Mutex
}
// VolumeGrowReservation tracks capacity reservations for a volume creation operation
type VolumeGrowReservation struct {
servers []*DataNode
reservationIds []string
diskType types.DiskType
}
// releaseAllReservations releases all reservations in this volume grow operation
func (vgr *VolumeGrowReservation) releaseAllReservations() {
for i, server := range vgr.servers {
if i < len(vgr.reservationIds) && vgr.reservationIds[i] != "" {
server.ReleaseReservedCapacity(vgr.reservationIds[i])
}
}
}
func (o *VolumeGrowOption) String() string {
blob, _ := json.Marshal(o)
return string(blob)
}
func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{}
}
// one replication type may need rp.GetCopyCount() actual volumes
// given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count uint32) {
switch copyCount {
case 1:
count = VolumeGrowStrategy.Copy1Count
case 2:
count = VolumeGrowStrategy.Copy2Count
case 3:
count = VolumeGrowStrategy.Copy3Count
default:
count = VolumeGrowStrategy.CopyOtherCount
}
return
}
func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, grpcDialOption grpc.DialOption, topo *Topology, targetCount uint32) (result []*master_pb.VolumeLocation, err error) {
if targetCount == 0 {
targetCount = vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount())
}
result, err = vg.GrowByCountAndType(grpcDialOption, targetCount, option, topo)
if len(result) > 0 && len(result)%option.ReplicaPlacement.GetCopyCount() == 0 {
return result, nil
}
return result, err
}
func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targetCount uint32, option *VolumeGrowOption, topo *Topology) (result []*master_pb.VolumeLocation, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := uint32(0); i < targetCount; i++ {
if res, e := vg.findAndGrow(grpcDialOption, topo, option); e == nil {
result = append(result, res...)
} else {
glog.V(0).Infof("create %d volume, created %d: %v", targetCount, len(result), e)
return result, e
}
}
return
}
func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) {
servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations
if e != nil {
return nil, e
}
// Ensure reservations are released if anything goes wrong
defer func() {
if err != nil && reservation != nil {
reservation.releaseAllReservations()
}
}()
for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) {
glog.V(0).Infof("wait for volume servers to join back")
time.Sleep(constants.VolumePulseSeconds / 2)
}
vid, raftErr := topo.NextVolumeId()
if raftErr != nil {
return nil, raftErr
}
if err = vg.grow(grpcDialOption, topo, vid, option, reservation, servers...); err == nil {
for _, server := range servers {
result = append(result, &master_pb.VolumeLocation{
Url: server.Url(),
PublicUrl: server.PublicUrl,
DataCenter: server.GetDataCenterId(),
GrpcPort: uint32(server.GrpcPort),
NewVids: []uint32{uint32(vid)},
})
}
}
return
}
// 1. find the main data node
// 1.1 collect all data nodes that have 1 slots
// 2.2 collect all racks that have rp.SameRackCount+1
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
// 2. find rest data nodes
// If useReservations is true, reserves capacity on each server and returns reservation info
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
// Track tentative reservations to make the process atomic
var tentativeReservation *VolumeGrowReservation
// Select appropriate functions based on useReservations flag
var availableSpaceFunc func(Node, *VolumeGrowOption) int64
var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error)
if useReservations {
// Initialize tentative reservation tracking
tentativeReservation = &VolumeGrowReservation{
servers: make([]*DataNode, 0),
reservationIds: make([]string, 0),
diskType: option.DiskType,
}
// For reservations, we make actual reservations during node selection
availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
return node.AvailableSpaceForReservation(option)
}
reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
return node.ReserveOneVolumeForReservation(r, option)
}
} else {
availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 {
return node.AvailableSpaceFor(option)
}
reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) {
return node.ReserveOneVolume(r, option)
}
}
// Ensure cleanup of partial reservations on error
defer func() {
if err != nil && tentativeReservation != nil {
tentativeReservation.releaseAllReservations()
}
}()
mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if availableSpaceFunc(node, option) < int64(rp.DiffRackCount+rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if availableSpaceFunc(n, option) >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
}
}
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
}
return nil
})
if dc_err != nil {
return nil, nil, dc_err
}
//find main rack and other racks
mainRack, otherRacks, rackErr := mainDataCenter.(*DataCenter).PickNodesByWeight(rp.DiffRackCount+1, option, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
if availableSpaceFunc(node, option) < int64(rp.SameRackCount+1) {
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if availableSpaceFunc(n, option) >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
})
if rackErr != nil {
return nil, nil, rackErr
}
//find main server and other servers
mainServer, otherServers, serverErr := mainRack.(*Rack).PickNodesByWeight(rp.SameRackCount+1, option, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if useReservations {
// For reservations, atomically check and reserve capacity
if node.IsDataNode() {
reservationId, success := node.TryReserveCapacity(option.DiskType, 1)
if !success {
return fmt.Errorf("Cannot reserve capacity on node %s", node.Id())
}
// Track the reservation for later cleanup if needed
tentativeReservation.servers = append(tentativeReservation.servers, node.(*DataNode))
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
} else if availableSpaceFunc(node, option) < 1 {
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1)
}
} else if availableSpaceFunc(node, option) < 1 {
return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1)
}
return nil
})
if serverErr != nil {
return nil, nil, serverErr
}
servers = append(servers, mainServer.(*DataNode))
for _, server := range otherServers {
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
r := rand.Int64N(availableSpaceFunc(rack, option))
if server, e := reserveOneVolumeFunc(rack, r, option); e == nil {
servers = append(servers, server)
// If using reservations, also make a reservation on the selected server
if useReservations {
reservationId, success := server.TryReserveCapacity(option.DiskType, 1)
if !success {
return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other rack", server.Id())
}
tentativeReservation.servers = append(tentativeReservation.servers, server)
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
}
} else {
return servers, nil, e
}
}
for _, datacenter := range otherDataCenters {
r := rand.Int64N(availableSpaceFunc(datacenter, option))
if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil {
servers = append(servers, server)
// If using reservations, also make a reservation on the selected server
if useReservations {
reservationId, success := server.TryReserveCapacity(option.DiskType, 1)
if !success {
return servers, nil, fmt.Errorf("failed to reserve capacity on server %s from other datacenter", server.Id())
}
tentativeReservation.servers = append(tentativeReservation.servers, server)
tentativeReservation.reservationIds = append(tentativeReservation.reservationIds, reservationId)
}
} else {
return servers, nil, e
}
}
// If reservations were made, return the tentative reservation
if useReservations && tentativeReservation != nil {
reservation = tentativeReservation
glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers))
}
return servers, reservation, nil
}
// grow creates volumes on the provided servers, optionally managing capacity reservations
func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, reservation *VolumeGrowReservation, servers ...*DataNode) (growErr error) {
var createdVolumes []storage.VolumeInfo
for _, server := range servers {
if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil {
createdVolumes = append(createdVolumes, storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
Version: needle.Version(option.Version),
DiskType: option.DiskType.String(),
ModifiedAtSecond: time.Now().Unix(),
})
glog.V(0).Infof("Created Volume %d on %s", vid, server.NodeImpl.String())
} else {
glog.Warningf("Failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
growErr = fmt.Errorf("failed to assign volume %d on %s: %v", vid, server.NodeImpl.String(), err)
break
}
}
if growErr == nil {
for i, vi := range createdVolumes {
server := servers[i]
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String())
}
// Release reservations on success since volumes are now registered
if reservation != nil {
reservation.releaseAllReservations()
}
} else {
// cleaning up created volume replicas
for i, vi := range createdVolumes {
server := servers[i]
if err := DeleteVolume(server, grpcDialOption, vi.Id); err != nil {
glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String())
}
}
// Reservations will be released by the caller in case of failure
}
return growErr
}
|