aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-03-30 02:10:50 -0700
committerChris Lu <chris.lu@uber.com>2021-03-30 02:10:53 -0700
commitac875976c0731597ae3df324e71b7ec2b8b4b83d (patch)
tree44b9f8513d46b9748a4d810b957d27621b9da004 /weed/server/filer_server_handlers.go
parenta1e18a1384fa9bb6223471a7ac843884056392e2 (diff)
downloadseaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz
seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/server/filer_server_handlers.go')
-rw-r--r--weed/server/filer_server_handlers.go37
1 files changed, 27 insertions, 10 deletions
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 3bc0c5d0d..0df00aa5c 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
@@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fs.DeleteHandler(w, r)
}
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
- case "PUT":
- stats.FilerRequestCounter.WithLabelValues("put").Inc()
- if _, ok := r.URL.Query()["tagging"]; ok {
- fs.PutTaggingHandler(w, r)
- } else {
+ case "POST", "PUT":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ fs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ fs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&fs.inFlightDataSize, contentLength)
+ fs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
+ fs.inFlightDataLimitCond.Signal()
+ }()
+
+ if r.Method == "PUT" {
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w, r)
+ } else {
+ fs.PostHandler(w, r)
+ }
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
+ } else { // method == "POST"
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
- stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
- case "POST":
- stats.FilerRequestCounter.WithLabelValues("post").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
OptionsHandler(w, r, false)