aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-12-27 11:45:44 -0800
committerchrislu <chris.lu@gmail.com>2023-12-27 11:45:44 -0800
commitbebbc9fe444cd1e8eafe59cc1dd5129d61cad7a4 (patch)
tree558e14ee68027b7515a815a0f3ae50815898c15b
parent263b7535bb3acf8f7f8db8937a4a03d19d1b4f86 (diff)
downloadseaweedfs-bebbc9fe444cd1e8eafe59cc1dd5129d61cad7a4.tar.xz
seaweedfs-bebbc9fe444cd1e8eafe59cc1dd5129d61cad7a4.zip
create volume grow request if the selected volume is close to full
-rw-r--r--weed/server/master_grpc_server_assign.go26
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server_handlers.go7
-rw-r--r--weed/topology/topology.go16
-rw-r--r--weed/topology/volume_layout.go57
5 files changed, 62 insertions, 46 deletions
diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go
index 2923b52c1..2aede2d50 100644
--- a/weed/server/master_grpc_server_assign.go
+++ b/weed/server/master_grpc_server_assign.go
@@ -71,17 +71,6 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
- if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) {
- if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for " + option.String())
- }
- vl.AddGrowRequest()
- ms.vgCh <- &topology.VolumeGrowRequest{
- Option: option,
- Count: int(req.WritableVolumeCount),
- }
- }
-
var (
lastErr error
maxTimeout = time.Second * 10
@@ -89,9 +78,20 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
)
for time.Now().Sub(startTime) < maxTimeout {
- fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
+ fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl)
+ if shouldGrow && !vl.HasGrowRequest() {
+ // if picked volume is almost full, trigger a volume-grow request
+ if ms.Topo.AvailableSpaceFor(option) <= 0 {
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
+ }
+ vl.AddGrowRequest()
+ ms.vgCh <- &topology.VolumeGrowRequest{
+ Option: option,
+ Count: int(req.WritableVolumeCount),
+ }
+ }
if err != nil {
- glog.Warningf("PickForWrite %+v: %v", req, err)
+ // glog.Warningf("PickForWrite %+v: %v", req, err)
lastErr = err
time.Sleep(200 * time.Millisecond)
continue
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 4fa6406a7..ba18ce649 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -71,6 +71,8 @@ func (ms *MasterServer) ProcessGrowRequest() {
} else {
glog.V(4).Infoln("discard volume grow request")
+ time.Sleep(time.Millisecond * 211)
+ vl.DoneGrowRequest()
}
}
}()
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 2f2fa199d..6ade9402f 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -119,12 +119,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
- if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) {
+ fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl)
+ if shouldGrow && !vl.HasGrowRequest() {
+ // if picked volume is almost full, trigger a volume-grow request
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
return
}
+
errCh := make(chan error, 1)
vl.AddGrowRequest()
ms.vgCh <- &topology.VolumeGrowRequest{
@@ -137,10 +140,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
}
- fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
ms.maybeAddJwtAuthorization(w, fid, true)
dn := dnList.Head()
+
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 1397a0288..03d7570c1 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -201,16 +201,18 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
return next, nil
}
-func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *VolumeLocationList, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType).PickForWrite(count, option)
+func (t *Topology) PickForWrite(requestedCount uint64, option *VolumeGrowOption, volumeLayout *VolumeLayout) (fileId string, count uint64, volumeLocationList *VolumeLocationList, shouldGrow bool, err error) {
+ var vid needle.VolumeId
+ vid, count, volumeLocationList, shouldGrow, err = volumeLayout.PickForWrite(requestedCount, option)
if err != nil {
- return "", 0, nil, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
+ return "", 0, nil, shouldGrow, fmt.Errorf("failed to find writable volumes for collection:%s replication:%s ttl:%s error: %v", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String(), err)
}
- if datanodes.Length() == 0 {
- return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
+ if volumeLocationList.Length() == 0 {
+ return "", 0, nil, shouldGrow, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
- fileId := t.Sequence.NextFileId(count)
- return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes, nil
+ nextFileId := t.Sequence.NextFileId(requestedCount)
+ fileId = needle.NewFileId(vid, nextFileId, rand.Uint32()).String()
+ return fileId, count, volumeLocationList, shouldGrow, nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 21f5b7267..1f7511cc7 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -106,7 +106,6 @@ func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
growRequestCount int32
- growRequestTime time.Time
rp *super_block.ReplicaPlacement
ttl *needle.TTL
diskType types.DiskType
@@ -281,28 +280,41 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*needle.VolumeId, uint64, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (vid needle.VolumeId, counter uint64, locationList *VolumeLocationList, shouldGrow bool, err error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
lenWriters := len(vl.writables)
if lenWriters <= 0 {
//glog.V(0).Infoln("No more writable volumes!")
- return nil, 0, nil, errors.New("No more writable volumes!")
+ shouldGrow = true
+ return 0, 0, nil, shouldGrow, errors.New("No more writable volumes!")
}
if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.Intn(lenWriters)]
- locationList := vl.vid2location[vid]
- if locationList != nil {
- return &vid, count, locationList, nil
+ locationList = vl.vid2location[vid]
+ if locationList != nil && locationList.Length() > 0 {
+ // check whether picked file is close to full
+ dn := locationList.Head()
+ info, _ := dn.GetVolumesById(vid)
+ if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
+ shouldGrow = true
+ }
+ return vid, count, locationList, shouldGrow, nil
}
- return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+ return 0, 0, nil, shouldGrow, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
- var vid needle.VolumeId
- var locationList *VolumeLocationList
- counter := 0
- for _, v := range vl.writables {
- volumeLocationList := vl.vid2location[v]
+
+ // clone vl.writables
+ writables := make([]needle.VolumeId, len(vl.writables))
+ copy(writables, vl.writables)
+ // randomize the writables
+ rand.Shuffle(len(writables), func(i, j int) {
+ writables[i], writables[j] = writables[j], writables[i]
+ })
+
+ for _, writableVolumeId := range writables {
+ volumeLocationList := vl.vid2location[writableVolumeId]
for _, dn := range volumeLocationList.list {
if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) {
continue
@@ -313,29 +325,26 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue
}
- counter++
- if rand.Intn(counter) < 1 {
- vid, locationList = v, volumeLocationList.Copy()
+ vid, locationList = writableVolumeId, volumeLocationList.Copy()
+ // check whether picked file is close to full
+ info, _ := dn.GetVolumesById(writableVolumeId)
+ if float64(info.Size) > float64(vl.volumeSizeLimit)*option.Threshold() {
+ shouldGrow = true
}
+ return
}
}
- return &vid, count, locationList, nil
+ return vid, count, locationList, shouldGrow, fmt.Errorf("No writable volumes in DataCenter:%v Rack:%v DataNode:%v", option.DataCenter, option.Rack, option.DataNode)
}
func (vl *VolumeLayout) HasGrowRequest() bool {
- if atomic.LoadInt32(&vl.growRequestCount) > 0 &&
- vl.growRequestTime.Add(time.Minute).After(time.Now()) {
- return true
- }
- return false
+ return atomic.LoadInt32(&vl.growRequestCount) > 0
}
func (vl *VolumeLayout) AddGrowRequest() {
- vl.growRequestTime = time.Now()
atomic.AddInt32(&vl.growRequestCount, 1)
}
func (vl *VolumeLayout) DoneGrowRequest() {
- vl.growRequestTime = time.Unix(0, 0)
- atomic.StoreInt32(&vl.growRequestCount, 0)
+ atomic.AddInt32(&vl.growRequestCount, -1)
}
func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {