diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:50 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:53 -0700 |
| commit | ac875976c0731597ae3df324e71b7ec2b8b4b83d (patch) | |
| tree | 44b9f8513d46b9748a4d810b957d27621b9da004 /weed/server/filer_server_handlers.go | |
| parent | a1e18a1384fa9bb6223471a7ac843884056392e2 (diff) | |
| download | seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip | |
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/server/filer_server_handlers.go')
| -rw-r--r-- | weed/server/filer_server_handlers.go | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 3bc0c5d0d..0df00aa5c 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -4,6 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/stats" @@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { fs.DeleteHandler(w, r) } stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) - case "PUT": - stats.FilerRequestCounter.WithLabelValues("put").Inc() - if _, ok := r.URL.Query()["tagging"]; ok { - fs.PutTaggingHandler(w, r) - } else { + case "POST", "PUT": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + fs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit { + fs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&fs.inFlightDataSize, contentLength) + fs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&fs.inFlightDataSize, -contentLength) + fs.inFlightDataLimitCond.Signal() + }() + + if r.Method == "PUT" { + stats.FilerRequestCounter.WithLabelValues("put").Inc() + if _, ok := r.URL.Query()["tagging"]; ok { + fs.PutTaggingHandler(w, r) + } else { + fs.PostHandler(w, r) + } + stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) + } else { // method == "POST" + stats.FilerRequestCounter.WithLabelValues("post").Inc() fs.PostHandler(w, r) + stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) } - stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) - case "POST": - stats.FilerRequestCounter.WithLabelValues("post").Inc() - fs.PostHandler(w, r) - stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) case "OPTIONS": stats.FilerRequestCounter.WithLabelValues("options").Inc() OptionsHandler(w, r, false) |
