aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLHHDZ <changlin.shi@ly.com>2022-08-05 16:16:42 +0800
committerGitHub <noreply@github.com>2022-08-05 01:16:42 -0700
commit84ec68e11af0e7ec08c188714f912955cd71759a (patch)
tree006ee731be2ff7072d91b46006ddee184efb2314
parentb278bb24d380fe8c6ea6a31e0250791abb97d807 (diff)
downloadseaweedfs-84ec68e11af0e7ec08c188714f912955cd71759a.tar.xz
seaweedfs-84ec68e11af0e7ec08c188714f912955cd71759a.zip
Add download speed limit support (#3408)
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer/stream.go6
-rw-r--r--weed/server/filer_server.go1
-rw-r--r--weed/server/filer_server_handlers_read.go2
5 files changed, 12 insertions, 1 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 888dc2d03..5c1e653cb 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -59,6 +59,7 @@ type FilerOptions struct {
debugPort *int
localSocket *string
showUIDirectoryDelete *bool
+ downloadMaxMBps *int
}
func init() {
@@ -87,6 +88,7 @@ func init() {
f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
+ f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -239,6 +241,7 @@ func (fo *FilerOptions) startFiler() {
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
+ DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index d4c7fb64e..1f5aa5727 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -115,6 +115,7 @@ func init() {
filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button")
+ filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 9ac74d0a0..b4ec58478 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -68,6 +68,10 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
}
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+ return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0)
+}
+
+func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error {
glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
@@ -95,6 +99,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url[chunkView.FileId] = urlStrings
}
+ downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
for _, chunkView := range chunkViews {
if offset < chunkView.LogicOffset {
@@ -118,6 +123,7 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
return fmt.Errorf("read chunk: %v", err)
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
+ downloadThrottler.MaybeSlowdown(int64(chunkView.Size))
}
if remaining > 0 {
glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 1e220d5db..a243b07f9 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -68,6 +68,7 @@ type FilerOption struct {
SaveToFilerLimit int64
ConcurrentUploadLimit int64
ShowUIDirectoryDelete bool
+ DownloadMaxBytesPs int64
}
type FilerServer struct {
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 04017894b..fcd08a79e 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -238,7 +238,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size)
+ err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err)