diff options
Diffstat (limited to 'weed/server/volume_server_handlers_read.go')
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 157 |
1 files changed, 96 insertions, 61 deletions
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 15d639f49..9860d6e9e 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "fmt" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "mime" "net/http" @@ -17,19 +19,18 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/storage/types" - "github.com/seaweedfs/seaweedfs/weed/util/mem" - "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/images" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) +const reqIsProxied = "proxied" + var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func NotFound(w http.ResponseWriter) { @@ -42,6 +43,90 @@ func InternalError(w http.ResponseWriter) { w.WriteHeader(http.StatusInternalServerError) } +func (vs *VolumeServer) proxyReqToTargetServer(w http.ResponseWriter, r *http.Request) { + vid, fid, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := needle.NewVolumeId(vid) + if err != nil { + glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err) + w.WriteHeader(http.StatusBadRequest) + return + } + lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String()) + if err != nil || len(lookupResult.Locations) <= 0 { + glog.V(0).Infoln("lookup error:", err, r.URL.Path) + NotFound(w) + return + } + var tragetUrl *url.URL + location := fmt.Sprintf("%s:%d", vs.store.Ip, vs.store.Port) + for _, loc := range lookupResult.Locations { + if !strings.Contains(loc.Url, location) { + rawURL, _ := util_http.NormalizeUrl(loc.Url) + tragetUrl, _ = url.Parse(rawURL) + break + } + } + if tragetUrl == nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.EmptyReadProxyLoc).Inc() + glog.Errorf("failed lookup target host is empty locations: %+v, %s", lookupResult.Locations, location) + NotFound(w) + return + } + if vs.ReadMode == "proxy" { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadProxyReq).Inc() + // proxy client request to target server + r.URL.Host = tragetUrl.Host + r.URL.Scheme = tragetUrl.Scheme + r.URL.Query().Add(reqIsProxied, "true") + request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) + if err != nil { + glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) + InternalError(w) + return + } + for k, vv := range r.Header { + for _, v := range vv { + request.Header.Add(k, v) + } + } + + response, err := util_http.GetGlobalHttpClient().Do(request) + if err != nil { + stats.VolumeServerHandlerCounter.WithLabelValues(stats.FailedReadProxyReq).Inc() + glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) + InternalError(w) + return + } + defer util_http.CloseResponse(response) + // proxy target response to client + for k, vv := range response.Header { + if k == "Server" { + continue + } + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(response.StatusCode) + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, response.Body, buf) + return + } else { + // redirect + stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadRedirectReq).Inc() + tragetUrl.Path = fmt.Sprintf("%s/%s,%s", tragetUrl.Path, vid, fid) + arg := url.Values{} + if c := r.FormValue("collection"); c != "" { + arg.Set("collection", c) + } + arg.Set(reqIsProxied, "true") + tragetUrl.RawQuery = arg.Encode() + http.Redirect(w, r, tragetUrl.String(), http.StatusMovedPermanently) + return + } +} + func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { n := new(needle.Needle) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) @@ -73,62 +158,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) NotFound(w) return } - lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String()) - glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err != nil || len(lookupResult.Locations) <= 0 { - glog.V(0).Infoln("lookup error:", err, r.URL.Path) - NotFound(w) - return - } - if vs.ReadMode == "proxy" { - // proxy client request to target server - rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url) - u, _ := url.Parse(rawURL) - r.URL.Host = u.Host - r.URL.Scheme = u.Scheme - request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) - if err != nil { - glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) - InternalError(w) - return - } - for k, vv := range r.Header { - for _, v := range vv { - request.Header.Add(k, v) - } - } - - response, err := util_http.GetGlobalHttpClient().Do(request) - if err != nil { - glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) - InternalError(w) - return - } - defer util_http.CloseResponse(response) - // proxy target response to client - for k, vv := range response.Header { - for _, v := range vv { - w.Header().Add(k, v) - } - } - w.WriteHeader(response.StatusCode) - buf := mem.Allocate(128 * 1024) - defer mem.Free(buf) - io.CopyBuffer(w, response.Body, buf) - return - } else { - // redirect - rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl) - u, _ := url.Parse(rawURL) - u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) - arg := url.Values{} - if c := r.FormValue("collection"); c != "" { - arg.Set("collection", c) - } - u.RawQuery = arg.Encode() - http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - return - } + vs.proxyReqToTargetServer(w, r) + return } cookie := n.Cookie @@ -145,14 +176,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) memoryCost = size atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(memoryCost)) } + if hasVolume { count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn) } else if hasEcVolume { count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn) } + defer func() { atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(memoryCost)) - vs.inFlightDownloadDataLimitCond.Signal() + if vs.concurrentDownloadLimit != 0 { + vs.inFlightDownloadDataLimitCond.Broadcast() + } }() if err != nil && err != storage.ErrorDeleted && hasVolume { |
