diff options
Diffstat (limited to 'weed/topology/volume_growth.go')
| -rw-r--r-- | weed/topology/volume_growth.go | 122 |
1 files changed, 100 insertions, 22 deletions
diff --git a/weed/topology/volume_growth.go b/weed/topology/volume_growth.go index c62fd72a0..f7af4e0a5 100644 --- a/weed/topology/volume_growth.go +++ b/weed/topology/volume_growth.go @@ -74,6 +74,22 @@ type VolumeGrowth struct { accessLock sync.Mutex } +// VolumeGrowReservation tracks capacity reservations for a volume creation operation +type VolumeGrowReservation struct { + servers []*DataNode + reservationIds []string + diskType types.DiskType +} + +// releaseAllReservations releases all reservations in this volume grow operation +func (vgr *VolumeGrowReservation) releaseAllReservations() { + for i, server := range vgr.servers { + if i < len(vgr.reservationIds) && vgr.reservationIds[i] != "" { + server.ReleaseReservedCapacity(vgr.reservationIds[i]) + } + } +} + func (o *VolumeGrowOption) String() string { blob, _ := json.Marshal(o) return string(blob) @@ -125,10 +141,17 @@ func (vg *VolumeGrowth) GrowByCountAndType(grpcDialOption grpc.DialOption, targe } func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topology, option *VolumeGrowOption) (result []*master_pb.VolumeLocation, err error) { - servers, e := vg.findEmptySlotsForOneVolume(topo, option) + servers, reservation, e := vg.findEmptySlotsForOneVolume(topo, option, true) // use reservations if e != nil { return nil, e } + // Ensure reservations are released if anything goes wrong + defer func() { + if err != nil && reservation != nil { + reservation.releaseAllReservations() + } + }() + for !topo.LastLeaderChangeTime.Add(constants.VolumePulseSeconds * 2).Before(time.Now()) { glog.V(0).Infof("wait for volume servers to join back") time.Sleep(constants.VolumePulseSeconds / 2) @@ -137,7 +160,7 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo if raftErr != nil { return nil, raftErr } - if err = vg.grow(grpcDialOption, topo, vid, option, servers...); err == nil { + if err = vg.grow(grpcDialOption, topo, vid, option, reservation, servers...); err == nil { for _, server := range servers { result = append(result, &master_pb.VolumeLocation{ Url: server.Url(), @@ -156,9 +179,37 @@ func (vg *VolumeGrowth) findAndGrow(grpcDialOption grpc.DialOption, topo *Topolo // 2.2 collect all racks that have rp.SameRackCount+1 // 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1 // 2. find rest data nodes -func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) { +// If useReservations is true, reserves capacity on each server and returns reservation info +func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption, useReservations bool) (servers []*DataNode, reservation *VolumeGrowReservation, err error) { //find main datacenter and other data centers rp := option.ReplicaPlacement + + // Select appropriate functions based on useReservations flag + var availableSpaceFunc func(Node, *VolumeGrowOption) int64 + var reserveOneVolumeFunc func(Node, int64, *VolumeGrowOption) (*DataNode, error) + + if useReservations { + availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { + return node.AvailableSpaceForReservation(option) + } + reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) { + return node.ReserveOneVolumeForReservation(r, option) + } + } else { + availableSpaceFunc = func(node Node, option *VolumeGrowOption) int64 { + return node.AvailableSpaceFor(option) + } + reserveOneVolumeFunc = func(node Node, r int64, option *VolumeGrowOption) (*DataNode, error) { + return node.ReserveOneVolume(r, option) + } + } + + // Ensure cleanup of partial reservations on error + defer func() { + if err != nil && reservation != nil { + reservation.releaseAllReservations() + } + }() mainDataCenter, otherDataCenters, dc_err := topo.PickNodesByWeight(rp.DiffDataCenterCount+1, option, func(node Node) error { if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) { return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter) @@ -166,14 +217,14 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if len(node.Children()) < rp.DiffRackCount+1 { return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1) } - if node.AvailableSpaceFor(option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.DiffRackCount+rp.SameRackCount+1) + if availableSpaceFunc(node, option) < int64(rp.DiffRackCount+rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.DiffRackCount+rp.SameRackCount+1) } possibleRacksCount := 0 for _, rack := range node.Children() { possibleDataNodesCount := 0 for _, n := range rack.Children() { - if n.AvailableSpaceFor(option) >= 1 { + if availableSpaceFunc(n, option) >= 1 { possibleDataNodesCount++ } } @@ -187,7 +238,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return nil }) if dc_err != nil { - return nil, dc_err + return nil, nil, dc_err } //find main rack and other racks @@ -195,8 +246,8 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) { return fmt.Errorf("Not matching preferred rack:%s", option.Rack) } - if node.AvailableSpaceFor(option) < int64(rp.SameRackCount+1) { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), rp.SameRackCount+1) + if availableSpaceFunc(node, option) < int64(rp.SameRackCount+1) { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), rp.SameRackCount+1) } if len(node.Children()) < rp.SameRackCount+1 { // a bit faster way to test free racks @@ -204,7 +255,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum } possibleDataNodesCount := 0 for _, n := range node.Children() { - if n.AvailableSpaceFor(option) >= 1 { + if availableSpaceFunc(n, option) >= 1 { possibleDataNodesCount++ } } @@ -214,7 +265,7 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum return nil }) if rackErr != nil { - return nil, rackErr + return nil, nil, rackErr } //find main server and other servers @@ -222,13 +273,13 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) { return fmt.Errorf("Not matching preferred data node:%s", option.DataNode) } - if node.AvailableSpaceFor(option) < 1 { - return fmt.Errorf("Free:%d < Expected:%d", node.AvailableSpaceFor(option), 1) + if availableSpaceFunc(node, option) < 1 { + return fmt.Errorf("Free:%d < Expected:%d", availableSpaceFunc(node, option), 1) } return nil }) if serverErr != nil { - return nil, serverErr + return nil, nil, serverErr } servers = append(servers, mainServer.(*DataNode)) @@ -236,25 +287,47 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *Volum servers = append(servers, server.(*DataNode)) } for _, rack := range otherRacks { - r := rand.Int64N(rack.AvailableSpaceFor(option)) - if server, e := rack.ReserveOneVolume(r, option); e == nil { + r := rand.Int64N(availableSpaceFunc(rack, option)) + if server, e := reserveOneVolumeFunc(rack, r, option); e == nil { servers = append(servers, server) } else { - return servers, e + return servers, nil, e } } for _, datacenter := range otherDataCenters { - r := rand.Int64N(datacenter.AvailableSpaceFor(option)) - if server, e := datacenter.ReserveOneVolume(r, option); e == nil { + r := rand.Int64N(availableSpaceFunc(datacenter, option)) + if server, e := reserveOneVolumeFunc(datacenter, r, option); e == nil { servers = append(servers, server) } else { - return servers, e + return servers, nil, e } } - return + + // If reservations are requested, try to reserve capacity on each server + if useReservations { + reservation = &VolumeGrowReservation{ + servers: servers, + reservationIds: make([]string, len(servers)), + diskType: option.DiskType, + } + + // Try to reserve capacity on each server + for i, server := range servers { + reservationId, success := server.TryReserveCapacity(option.DiskType, 1) + if !success { + return servers, nil, fmt.Errorf("failed to reserve capacity on server %s", server.Id()) + } + reservation.reservationIds[i] = reservationId + } + + glog.V(1).Infof("Successfully reserved capacity on %d servers for volume creation", len(servers)) + } + + return servers, reservation, nil } -func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, servers ...*DataNode) (growErr error) { +// grow creates volumes on the provided servers, optionally managing capacity reservations +func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid needle.VolumeId, option *VolumeGrowOption, reservation *VolumeGrowReservation, servers ...*DataNode) (growErr error) { var createdVolumes []storage.VolumeInfo for _, server := range servers { if err := AllocateVolume(server, grpcDialOption, vid, option); err == nil { @@ -283,6 +356,10 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid topo.RegisterVolumeLayout(vi, server) glog.V(0).Infof("Registered Volume %d on %s", vid, server.NodeImpl.String()) } + // Release reservations on success since volumes are now registered + if reservation != nil { + reservation.releaseAllReservations() + } } else { // cleaning up created volume replicas for i, vi := range createdVolumes { @@ -291,6 +368,7 @@ func (vg *VolumeGrowth) grow(grpcDialOption grpc.DialOption, topo *Topology, vid glog.Warningf("Failed to clean up volume %d on %s", vid, server.NodeImpl.String()) } } + // Reservations will be released by the caller in case of failure } return growErr |
