aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_server_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_server_handlers.go')
-rw-r--r--weed/server/volume_server_handlers.go30
1 files changed, 30 insertions, 0 deletions
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 7852c950a..4527add44 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,9 @@ package weed_server
import (
"net/http"
+ "strconv"
"strings"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ vs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
+ vs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&vs.inFlightDataSize, contentLength)
+ vs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
+ vs.inFlightDataLimitCond.Signal()
+ }()
+
+ // processs uploads
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+
case "OPTIONS":
stats.ReadRequest()
w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
@@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
}
+func getContentLength(r *http.Request) int64 {
+ contentLength := r.Header.Get("Content-Length")
+ if contentLength != "" {
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return length
+ }
+ return 0
+}
+
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" {