aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_grpc_server_volume.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_grpc_server_volume.go')
-rw-r--r--weed/server/master_grpc_server_volume.go52
1 files changed, 35 insertions, 17 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index e40f054e0..6d5ee0385 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
"math/rand/v2"
"strings"
"sync"
@@ -20,6 +21,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
+const (
+ volumeGrowStepCount = 2
+)
+
func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
glog.V(1).Infoln("starting automatic volume grow")
start := time.Now()
@@ -36,38 +41,48 @@ func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
func (ms *MasterServer) ProcessGrowRequest() {
go func() {
+ ctx := context.Background()
for {
if !ms.Topo.IsLeader() {
continue
}
dcs := ms.Topo.ListDataCenters()
+ var err error
for _, vlc := range ms.Topo.ListVolumeLayoutCollections() {
vl := vlc.VolumeLayout
+ lastGrowCount := vl.GetLastGrowCount()
if vl.HasGrowRequest() {
continue
}
- if vl.ShouldGrowVolumes(vlc.Collection) {
- vl.AddGrowRequest()
- ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
- Option: vlc.ToGrowOption(),
- Count: vl.GetLastGrowCount(),
- Reason: "collection autogrow",
- }
- } else {
+ writable, crowded := vl.GetWritableVolumeCount()
+ mustGrow := int(lastGrowCount) - writable
+ vgr := vlc.ToVolumeGrowRequest()
+ stats.MasterVolumeLayoutWritable.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(writable))
+ stats.MasterVolumeLayoutCrowded.WithLabelValues(vlc.Collection, vgr.DiskType, vgr.Replication, vgr.Ttl).Set(float64(crowded))
+
+ switch {
+ case mustGrow > 0:
+ vgr.WritableVolumeCount = uint32(mustGrow)
+ _, err = ms.VolumeGrow(ctx, vgr)
+ case crowded+volumeGrowStepCount >= writable:
+ vgr.WritableVolumeCount = volumeGrowStepCount
+ _, err = ms.VolumeGrow(ctx, vgr)
+ default:
for _, dc := range dcs {
if vl.ShouldGrowVolumesByDataNode("DataCenter", dc) {
- vl.AddGrowRequest()
- volumeGrowOption := vlc.ToGrowOption()
- volumeGrowOption.DataCenter = dc
- ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
- Option: volumeGrowOption,
- Count: vl.GetLastGrowCount(),
- Force: true,
- Reason: "per-dc autogrow",
+ vgr.DataCenter = dc
+ if lastGrowCount > 0 {
+ vgr.WritableVolumeCount = uint32(int(lastGrowCount) / len(dcs))
+ } else {
+ vgr.WritableVolumeCount = volumeGrowStepCount
}
+ _, err = ms.VolumeGrow(ctx, vgr)
}
}
}
+ if err != nil {
+ glog.V(0).Infof("volume grow request failed: %+v", err)
+ }
}
time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second)
}
@@ -101,7 +116,7 @@ func (ms *MasterServer) ProcessGrowRequest() {
})
// not atomic but it's okay
- if found || (!req.Force && !vl.ShouldGrowVolumes(req.Option.Collection)) {
+ if found || (!req.Force && !vl.ShouldGrowVolumes()) {
glog.V(4).Infoln("discard volume grow request")
time.Sleep(time.Millisecond * 211)
vl.DoneGrowRequest()
@@ -302,6 +317,9 @@ func (ms *MasterServer) VolumeGrow(ctx context.Context, req *master_pb.VolumeGro
if err != nil {
return nil, err
}
+ if req.DataCenter != "" && !ms.Topo.DataCenterExists(req.DataCenter) {
+ return nil, fmt.Errorf("data center not exists")
+ }
volumeGrowOption := topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,