aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-05 00:40:04 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-05 00:40:04 -0700
commit96119eab00e13ff056f3058b9032d12c3758cb35 (patch)
tree8d5def6022473d24edb12d9bde5d65cd0bf8c8f0
parent8a663060640a499f8a37a7f60e3a7c5a2f95fe05 (diff)
downloadseaweedfs-96119eab00e13ff056f3058b9032d12c3758cb35.tar.xz
seaweedfs-96119eab00e13ff056f3058b9032d12c3758cb35.zip
refactor
-rw-r--r--weed/server/master_grpc_server_volume.go9
-rw-r--r--weed/server/master_server_handlers.go4
-rw-r--r--weed/server/master_server_handlers_admin.go7
-rw-r--r--weed/topology/volume_layout.go6
4 files changed, 16 insertions, 10 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 4048225f3..95fdc3fd1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -42,8 +42,11 @@ func (ms *MasterServer) ProcessGrowRequest() {
return !found
})
+ option := req.Option
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
// not atomic but it's okay
- if !found && ms.shouldVolumeGrow(req.Option) {
+ if !found && vl.ShouldGrowVolumes(option) {
filter.Store(req, nil)
// we have lock called inside vg
go func() {
@@ -130,7 +133,9 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
- if ms.shouldVolumeGrow(option) {
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
+ if vl.ShouldGrowVolumes(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for " + option.String())
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 36c4239fb..8187ca21f 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -113,7 +113,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
- if ms.shouldVolumeGrow(option) {
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
+ if vl.ShouldGrowVolumes(option) {
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 549ea86dc..41a2b570b 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -132,13 +132,6 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
}
}
-func (ms *MasterServer) shouldVolumeGrow(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
- active, high := vl.GetActiveVolumeCount(option)
- //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
- return active <= high
-}
-
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index db05102fe..3e487fb2f 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -310,6 +310,12 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
return &vid, count, locationList, nil
}
+func (vl *VolumeLayout) ShouldGrowVolumes(option *VolumeGrowOption) bool {
+ active, crowded := vl.GetActiveVolumeCount(option)
+ //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
+ return active <= crowded
+}
+
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) (active, crowded int) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()