diff options
| author | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:50 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2021-03-30 02:10:53 -0700 |
| commit | ac875976c0731597ae3df324e71b7ec2b8b4b83d (patch) | |
| tree | 44b9f8513d46b9748a4d810b957d27621b9da004 /weed/server/volume_server_handlers.go | |
| parent | a1e18a1384fa9bb6223471a7ac843884056392e2 (diff) | |
| download | seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip | |
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/server/volume_server_handlers.go')
| -rw-r--r-- | weed/server/volume_server_handlers.go | 30 |
1 files changed, 30 insertions, 0 deletions
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 7852c950a..4527add44 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -2,7 +2,9 @@ package weed_server import ( "net/http" + "strconv" "strings" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/util" @@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque stats.DeleteRequest() vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT", "POST": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + vs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit { + vs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&vs.inFlightDataSize, contentLength) + vs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&vs.inFlightDataSize, -contentLength) + vs.inFlightDataLimitCond.Signal() + }() + + // processs uploads stats.WriteRequest() vs.guard.WhiteList(vs.PostHandler)(w, r) + case "OPTIONS": stats.ReadRequest() w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") @@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } } +func getContentLength(r *http.Request) int64 { + contentLength := r.Header.Get("Content-Length") + if contentLength != "" { + length, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + return 0 + } + return length + } + return 0 +} + func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { |
