aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-08 23:25:16 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-08 23:25:16 -0700
commit734c980040b77d19750cf4f00bb9a39312093d91 (patch)
tree11e54af8839b8fbc4222ac0cbb16b843dfff47c1
parent96ce85f5ae6281c14a4a190dcf5f527321c50472 (diff)
downloadseaweedfs-734c980040b77d19750cf4f00bb9a39312093d91.tar.xz
seaweedfs-734c980040b77d19750cf4f00bb9a39312093d91.zip
volume: support concurrent download data size limit
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go55
-rw-r--r--weed/server/volume_grpc_admin.go4
-rw-r--r--weed/server/volume_grpc_batch_delete.go2
-rw-r--r--weed/server/volume_grpc_query.go2
-rw-r--r--weed/server/volume_server.go38
-rw-r--r--weed/server/volume_server_handlers.go7
-rw-r--r--weed/server/volume_server_handlers_read.go17
-rw-r--r--weed/server/volume_server_handlers_write.go2
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/store_ec.go6
-rw-r--r--weed/storage/store_ec_delete.go2
-rw-r--r--weed/storage/volume_read.go5
-rw-r--r--weed/storage/volume_vacuum_test.go2
14 files changed, 91 insertions, 56 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 9bac2be97..fe10b24f7 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -116,6 +116,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
+ serverOptions.v.concurrentDownloadLimitMB = cmdServer.Flag.Int("volume.concurrentDownloadLimitMB", 64, "limit total concurrent download size")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 712fa0dce..235eff11b 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -35,31 +35,32 @@ var (
)
type VolumeServerOptions struct {
- port *int
- publicPort *int
- folders []string
- folderMaxLimits []int
- idxFolder *string
- ip *string
- publicUrl *string
- bindIp *string
- masters *string
- idleConnectionTimeout *int
- dataCenter *string
- rack *string
- whiteList []string
- indexType *string
- diskType *string
- fixJpgOrientation *bool
- readMode *string
- cpuProfile *string
- memProfile *string
- compactionMBPerSecond *int
- fileSizeLimitMB *int
- concurrentUploadLimitMB *int
- pprof *bool
- preStopSeconds *int
- metricsHttpPort *int
+ port *int
+ publicPort *int
+ folders []string
+ folderMaxLimits []int
+ idxFolder *string
+ ip *string
+ publicUrl *string
+ bindIp *string
+ masters *string
+ idleConnectionTimeout *int
+ dataCenter *string
+ rack *string
+ whiteList []string
+ indexType *string
+ diskType *string
+ fixJpgOrientation *bool
+ readMode *string
+ cpuProfile *string
+ memProfile *string
+ compactionMBPerSecond *int
+ fileSizeLimitMB *int
+ concurrentUploadLimitMB *int
+ concurrentDownloadLimitMB *int
+ pprof *bool
+ preStopSeconds *int
+ metricsHttpPort *int
// pulseSeconds *int
enableTcp *bool
}
@@ -85,7 +86,8 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
- v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
+ v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
+ v.concurrentDownloadLimitMB = cmdVolume.Flag.Int("concurrentDownloadLimitMB", 256, "limit total concurrent download size")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
@@ -232,6 +234,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
+ int64(*v.concurrentDownloadLimitMB)*1024*1024,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 2bc108a23..898c3da12 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
if !hasEcVolume {
return nil, fmt.Errorf("volume not found %d", req.VolumeId)
}
- count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
} else {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
}
if err != nil {
return nil, err
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 2c445f996..3645ad9c9 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -40,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
index 2f4fab96a..349d10097 100644
--- a/weed/server/volume_grpc_query.go
+++ b/weed/server/volume_grpc_query.go
@@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index a0f32700b..034521b4b 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -17,9 +17,12 @@ import (
)
type VolumeServer struct {
- inFlightUploadDataSize int64
- concurrentUploadLimit int64
- inFlightUploadDataLimitCond *sync.Cond
+ inFlightUploadDataSize int64
+ inFlightDownloadDataSize int64
+ concurrentUploadLimit int64
+ concurrentDownloadLimit int64
+ inFlightUploadDataLimitCond *sync.Cond
+ inFlightDownloadDataLimitCond *sync.Cond
SeedMasterNodes []string
currentMaster string
@@ -54,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
compactionMBPerSecond int,
fileSizeLimitMB int,
concurrentUploadLimit int64,
+ concurrentDownloadLimit int64,
) *VolumeServer {
v := util.GetViper()
@@ -67,19 +71,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
vs := &VolumeServer{
- pulseSeconds: pulseSeconds,
- dataCenter: dataCenter,
- rack: rack,
- needleMapKind: needleMapKind,
- FixJpgOrientation: fixJpgOrientation,
- ReadMode: readMode,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
- compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
- fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
- isHeartbeating: true,
- stopChan: make(chan bool),
- inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
- concurrentUploadLimit: concurrentUploadLimit,
+ pulseSeconds: pulseSeconds,
+ dataCenter: dataCenter,
+ rack: rack,
+ needleMapKind: needleMapKind,
+ FixJpgOrientation: fixJpgOrientation,
+ ReadMode: readMode,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
+ compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
+ inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
+ concurrentDownloadLimit: concurrentDownloadLimit,
}
vs.SeedMasterNodes = masterNodes
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 917c7cd25..ed7807bb8 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
+ vs.inFlightDownloadDataLimitCond.L.Lock()
+ for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
+ glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
+ vs.inFlightDownloadDataLimitCond.Wait()
+ }
vs.GetOrHeadHandler(w, r)
case "DELETE":
stats.DeleteRequest()
@@ -47,7 +52,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
contentLength := getContentLength(r)
vs.inFlightUploadDataLimitCond.L.Lock()
for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
- glog.V(4).Infof("wait because inflight data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
vs.inFlightUploadDataLimitCond.Wait()
}
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index cbad9c770..ae3c0b53f 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"mime"
"net/http"
@@ -12,6 +13,7 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -123,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
var count int
+ var needleSize types.Size
+ onReadSizeFn := func(size types.Size) {
+ needleSize = size
+ atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
+ vs.inFlightDownloadDataLimitCond.L.Unlock()
+ }
if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn)
} else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn)
}
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize))
+ vs.inFlightDownloadDataLimitCond.Signal()
+ }()
+
if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
glog.V(4).Infof("read needle: %v", err)
// start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 58212e8ff..aeb7d6e65 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -108,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
- _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
diff --git a/weed/storage/store.go b/weed/storage/store.go
index cda1e196b..c407a6081 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -356,9 +356,9 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, e
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
-func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) {
+func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
if v := s.findVolume(i); v != nil {
- return v.readNeedle(n, readOption)
+ return v.readNeedle(n, readOption, onReadSizeFn)
}
return 0, fmt.Errorf("volume %d not found", i)
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 9702fdd50..6ba7237e2 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -121,7 +121,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
}
}
-func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadSizeFn func(size types.Size)) (int, error) {
for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found {
@@ -133,6 +133,10 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, e
return 0, ErrorDeleted
}
+ if onReadSizeFn != nil {
+ onReadSizeFn(size)
+ }
+
glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 {
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index 4a75fb20b..6c10af3c5 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -14,7 +14,7 @@ import (
func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
- count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
+ count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
if err != nil {
return 0, err
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index f689eeec0..9751b56ae 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -13,7 +13,7 @@ import (
)
// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
v.dataFileAccessLock.RLock()
defer v.dataFileAccessLock.RUnlock()
@@ -33,6 +33,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
if readSize == 0 {
return 0, nil
}
+ if onReadSizeFn != nil {
+ onReadSizeFn(readSize)
+ }
err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index cd5a4f430..89fff4b2b 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -113,7 +113,7 @@ func TestCompaction(t *testing.T) {
}
n := newEmptyNeedle(uint64(i))
- size, err := v.readNeedle(n, nil)
+ size, err := v.readNeedle(n, nil, nil)
if err != nil {
t.Fatalf("read file %d: %v", i, err)
}