diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_batch_delete.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_query.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 38 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 17 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 2 |
7 files changed, 48 insertions, 24 deletions
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 2bc108a23..898c3da12 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv if !hasEcVolume { return nil, fmt.Errorf("volume not found %d", req.VolumeId) } - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil) } else { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) } if err != nil { return nil, err diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 2c445f996..3645ad9c9 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -40,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusNotFound, diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go index 2f4fab96a..349d10097 100644 --- a/weed/server/volume_grpc_query.go +++ b/weed/server/volume_grpc_query.go @@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_ n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) return err } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index a0f32700b..034521b4b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -17,9 +17,12 @@ import ( ) type VolumeServer struct { - inFlightUploadDataSize int64 - concurrentUploadLimit int64 - inFlightUploadDataLimitCond *sync.Cond + inFlightUploadDataSize int64 + inFlightDownloadDataSize int64 + concurrentUploadLimit int64 + concurrentDownloadLimit int64 + inFlightUploadDataLimitCond *sync.Cond + inFlightDownloadDataLimitCond *sync.Cond SeedMasterNodes []string currentMaster string @@ -54,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, compactionMBPerSecond int, fileSizeLimitMB int, concurrentUploadLimit int64, + concurrentDownloadLimit int64, ) *VolumeServer { v := util.GetViper() @@ -67,19 +71,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadMode: readMode, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), - compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, - fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, - isHeartbeating: true, - stopChan: make(chan bool), - inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), - concurrentUploadLimit: concurrentUploadLimit, + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadMode: readMode, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), + compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, + fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + isHeartbeating: true, + stopChan: make(chan bool), + inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), + inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), + concurrentUploadLimit: concurrentUploadLimit, + concurrentDownloadLimit: concurrentDownloadLimit, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 917c7cd25..ed7807bb8 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET", "HEAD": stats.ReadRequest() + vs.inFlightDownloadDataLimitCond.L.Lock() + for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit { + glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) + vs.inFlightDownloadDataLimitCond.Wait() + } vs.GetOrHeadHandler(w, r) case "DELETE": stats.DeleteRequest() @@ -47,7 +52,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque contentLength := getContentLength(r) vs.inFlightUploadDataLimitCond.L.Lock() for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { - glog.V(4).Infof("wait because inflight data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) vs.inFlightUploadDataLimitCond.Wait() } atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index cbad9c770..ae3c0b53f 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "mime" "net/http" @@ -12,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -123,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } var count int + var needleSize types.Size + onReadSizeFn := func(size types.Size) { + needleSize = size + atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize)) + vs.inFlightDownloadDataLimitCond.L.Unlock() + } if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn) } else if hasEcVolume { - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn) } + defer func() { + atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize)) + vs.inFlightDownloadDataLimitCond.Signal() + }() + if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { glog.V(4).Infof("read needle: %v", err) // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 58212e8ff..aeb7d6e65 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -108,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } - _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil) + _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) if ok != nil { m := make(map[string]uint32) m["size"] = 0 |
