aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_server_handlers_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_server_handlers_read.go')
-rw-r--r--weed/server/volume_server_handlers_read.go157
1 files changed, 96 insertions, 61 deletions
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 15d639f49..9860d6e9e 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"mime"
"net/http"
@@ -17,19 +19,18 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/util/mem"
-
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/images"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
+const reqIsProxied = "proxied"
+
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func NotFound(w http.ResponseWriter) {
@@ -42,6 +43,90 @@ func InternalError(w http.ResponseWriter) {
w.WriteHeader(http.StatusInternalServerError)
}
+func (vs *VolumeServer) proxyReqToTargetServer(w http.ResponseWriter, r *http.Request) {
+ vid, fid, _, _, _ := parseURLPath(r.URL.Path)
+ volumeId, err := needle.NewVolumeId(vid)
+ if err != nil {
+ glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err)
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String())
+ if err != nil || len(lookupResult.Locations) <= 0 {
+ glog.V(0).Infoln("lookup error:", err, r.URL.Path)
+ NotFound(w)
+ return
+ }
+ var tragetUrl *url.URL
+ location := fmt.Sprintf("%s:%d", vs.store.Ip, vs.store.Port)
+ for _, loc := range lookupResult.Locations {
+ if !strings.Contains(loc.Url, location) {
+ rawURL, _ := util_http.NormalizeUrl(loc.Url)
+ tragetUrl, _ = url.Parse(rawURL)
+ break
+ }
+ }
+ if tragetUrl == nil {
+ stats.VolumeServerHandlerCounter.WithLabelValues(stats.EmptyReadProxyLoc).Inc()
+ glog.Errorf("failed lookup target host is empty locations: %+v, %s", lookupResult.Locations, location)
+ NotFound(w)
+ return
+ }
+ if vs.ReadMode == "proxy" {
+ stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadProxyReq).Inc()
+ // proxy client request to target server
+ r.URL.Host = tragetUrl.Host
+ r.URL.Scheme = tragetUrl.Scheme
+ r.URL.Query().Add(reqIsProxied, "true")
+ request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
+ if err != nil {
+ glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
+ InternalError(w)
+ return
+ }
+ for k, vv := range r.Header {
+ for _, v := range vv {
+ request.Header.Add(k, v)
+ }
+ }
+
+ response, err := util_http.GetGlobalHttpClient().Do(request)
+ if err != nil {
+ stats.VolumeServerHandlerCounter.WithLabelValues(stats.FailedReadProxyReq).Inc()
+ glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
+ InternalError(w)
+ return
+ }
+ defer util_http.CloseResponse(response)
+ // proxy target response to client
+ for k, vv := range response.Header {
+ if k == "Server" {
+ continue
+ }
+ for _, v := range vv {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(response.StatusCode)
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ io.CopyBuffer(w, response.Body, buf)
+ return
+ } else {
+ // redirect
+ stats.VolumeServerHandlerCounter.WithLabelValues(stats.ReadRedirectReq).Inc()
+ tragetUrl.Path = fmt.Sprintf("%s/%s,%s", tragetUrl.Path, vid, fid)
+ arg := url.Values{}
+ if c := r.FormValue("collection"); c != "" {
+ arg.Set("collection", c)
+ }
+ arg.Set(reqIsProxied, "true")
+ tragetUrl.RawQuery = arg.Encode()
+ http.Redirect(w, r, tragetUrl.String(), http.StatusMovedPermanently)
+ return
+ }
+}
+
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
@@ -73,62 +158,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
NotFound(w)
return
}
- lookupResult, err := operation.LookupVolumeId(vs.GetMaster, vs.grpcDialOption, volumeId.String())
- glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
- if err != nil || len(lookupResult.Locations) <= 0 {
- glog.V(0).Infoln("lookup error:", err, r.URL.Path)
- NotFound(w)
- return
- }
- if vs.ReadMode == "proxy" {
- // proxy client request to target server
- rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url)
- u, _ := url.Parse(rawURL)
- r.URL.Host = u.Host
- r.URL.Scheme = u.Scheme
- request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
- if err != nil {
- glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
- InternalError(w)
- return
- }
- for k, vv := range r.Header {
- for _, v := range vv {
- request.Header.Add(k, v)
- }
- }
-
- response, err := util_http.GetGlobalHttpClient().Do(request)
- if err != nil {
- glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
- InternalError(w)
- return
- }
- defer util_http.CloseResponse(response)
- // proxy target response to client
- for k, vv := range response.Header {
- for _, v := range vv {
- w.Header().Add(k, v)
- }
- }
- w.WriteHeader(response.StatusCode)
- buf := mem.Allocate(128 * 1024)
- defer mem.Free(buf)
- io.CopyBuffer(w, response.Body, buf)
- return
- } else {
- // redirect
- rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl)
- u, _ := url.Parse(rawURL)
- u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
- arg := url.Values{}
- if c := r.FormValue("collection"); c != "" {
- arg.Set("collection", c)
- }
- u.RawQuery = arg.Encode()
- http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
- return
- }
+ vs.proxyReqToTargetServer(w, r)
+ return
}
cookie := n.Cookie
@@ -145,14 +176,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
memoryCost = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(memoryCost))
}
+
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn)
}
+
defer func() {
atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(memoryCost))
- vs.inFlightDownloadDataLimitCond.Signal()
+ if vs.concurrentDownloadLimit != 0 {
+ vs.inFlightDownloadDataLimitCond.Broadcast()
+ }
}()
if err != nil && err != storage.ErrorDeleted && hasVolume {