aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go2
-rw-r--r--weed/server/volume_server.go2
-rw-r--r--weed/server/volume_server_handlers.go25
3 files changed, 18 insertions, 11 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 96150deb3..4f8fd947a 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -103,7 +103,7 @@ func init() {
// start iam on filer
filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service")
- filerIamOptions.ip = f.ip
+ filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address")
filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 4199ae36b..dcd27673c 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -23,7 +23,6 @@ type VolumeServer struct {
inFlightDownloadDataSize int64
concurrentUploadLimit int64
concurrentDownloadLimit int64
- inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
SeedMasterNodes []pb.ServerAddress
@@ -84,7 +83,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
- inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 510902cf0..49bc297fb 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "fmt"
"net/http"
"strconv"
"strings"
@@ -39,8 +40,14 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.ReadRequest()
vs.inFlightDownloadDataLimitCond.L.Lock()
for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
- glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
- vs.inFlightDownloadDataLimitCond.Wait()
+ select {
+ case <-r.Context().Done():
+ glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
+ return
+ default:
+ glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
+ vs.inFlightDownloadDataLimitCond.Wait()
+ }
}
vs.inFlightDownloadDataLimitCond.L.Unlock()
vs.GetOrHeadHandler(w, r)
@@ -51,16 +58,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
// wait until in flight data is less than the limit
contentLength := getContentLength(r)
- vs.inFlightUploadDataLimitCond.L.Lock()
- for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
- glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
- vs.inFlightUploadDataLimitCond.Wait()
+
+ // exclude the replication from the concurrentUploadLimitMB
+ if vs.concurrentUploadLimit != 0 && r.URL.Query().Get("type") != "replicate" &&
+ atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
+ err := fmt.Errorf("reject because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ glog.V(1).Infof("too many requests: %v", err)
+ writeJsonError(w, r, http.StatusTooManyRequests, err)
+ return
}
- vs.inFlightUploadDataLimitCond.L.Unlock()
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
defer func() {
atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
- vs.inFlightUploadDataLimitCond.Signal()
}()
// processs uploads