aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-03-30 02:10:50 -0700
committerChris Lu <chris.lu@uber.com>2021-03-30 02:10:53 -0700
commitac875976c0731597ae3df324e71b7ec2b8b4b83d (patch)
tree44b9f8513d46b9748a4d810b957d27621b9da004 /weed/server
parenta1e18a1384fa9bb6223471a7ac843884056392e2 (diff)
downloadseaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.tar.xz
seaweedfs-ac875976c0731597ae3df324e71b7ec2b8b4b83d.zip
filer, volume: add concurrent upload size limit to avoid OOM
add some back pressure when writes are slow
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_server.go43
-rw-r--r--weed/server/filer_server_handlers.go37
-rw-r--r--weed/server/volume_server.go8
-rw-r--r--weed/server/volume_server_handlers.go30
4 files changed, 89 insertions, 29 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 9e0770afa..a876b6d83 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -45,22 +45,23 @@ import (
)
type FilerOption struct {
- Masters []string
- Collection string
- DefaultReplication string
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- Rack string
- DefaultLevelDbDir string
- DisableHttp bool
- Host string
- Port uint32
- recursiveDelete bool
- Cipher bool
- SaveToFilerLimit int
- Filers []string
+ Masters []string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int
+ Filers []string
+ ConcurrentUploadLimit int64
}
type FilerServer struct {
@@ -79,14 +80,18 @@ type FilerServer struct {
brokers map[string]map[string]bool
brokersLock sync.Mutex
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
- brokers: make(map[string]map[string]bool),
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 3bc0c5d0d..0df00aa5c 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
@@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fs.DeleteHandler(w, r)
}
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
- case "PUT":
- stats.FilerRequestCounter.WithLabelValues("put").Inc()
- if _, ok := r.URL.Query()["tagging"]; ok {
- fs.PutTaggingHandler(w, r)
- } else {
+ case "POST", "PUT":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ fs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ fs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&fs.inFlightDataSize, contentLength)
+ fs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
+ fs.inFlightDataLimitCond.Signal()
+ }()
+
+ if r.Method == "PUT" {
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w, r)
+ } else {
+ fs.PostHandler(w, r)
+ }
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
+ } else { // method == "POST"
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
- stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
- case "POST":
- stats.FilerRequestCounter.WithLabelValues("post").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
OptionsHandler(w, r, false)
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index e496b1ce2..e11d607a4 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
+ "sync"
"google.golang.org/grpc"
@@ -34,6 +35,10 @@ type VolumeServer struct {
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
+ concurrentUploadLimit int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -48,6 +53,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readRedirect bool,
compactionMBPerSecond int,
fileSizeLimitMB int,
+ concurrentUploadLimit int64,
) *VolumeServer {
v := util.GetViper()
@@ -72,6 +78,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
}
vs.SeedMasterNodes = masterNodes
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 7852c950a..4527add44 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,9 @@ package weed_server
import (
"net/http"
+ "strconv"
"strings"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ vs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
+ vs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&vs.inFlightDataSize, contentLength)
+ vs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
+ vs.inFlightDataLimitCond.Signal()
+ }()
+
+ // processs uploads
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+
case "OPTIONS":
stats.ReadRequest()
w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
@@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
}
+func getContentLength(r *http.Request) int64 {
+ contentLength := r.Header.Get("Content-Length")
+ if contentLength != "" {
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return length
+ }
+ return 0
+}
+
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" {