aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go7
-rw-r--r--weed/server/filer_server_handlers_proxy.go17
-rw-r--r--weed/server/filer_server_handlers_write.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go8
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go8
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/server/master_server.go3
-rw-r--r--weed/server/master_server_handlers_admin.go7
-rw-r--r--weed/server/volume_grpc_remote.go13
-rw-r--r--weed/server/volume_server_handlers_read.go11
-rw-r--r--weed/server/webdav_server.go7
11 files changed, 60 insertions, 32 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 7be2f8a76..e6f6cdb88 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
PairMap: pu.PairMap,
Jwt: assignResult.Auth,
}
- uploadResult, err := operation.UploadData(pu.Data, uploadOption)
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index e04994569..c1a26ca11 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,24 +3,13 @@ package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"math/rand"
"net/http"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
-var (
- client *http.Client
-)
-
-func init() {
- client = &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
-}
-
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
@@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
}
}
- proxyResponse, postErr := client.Do(proxyReq)
+ proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
w.WriteHeader(http.StatusInternalServerError)
return
}
- defer util.CloseResponse(proxyResponse)
+ defer util_http.CloseResponse(proxyResponse)
for k, v := range proxyResponse.Header {
w.Header()[k] = v
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index b186fd34e..ab3988f8c 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
fs.autoChunk(ctx, w, r, contentLength, so)
}
- util.CloseRequest(r)
+ util_http.CloseRequest(r)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 029fbb7c9..1c7ed0c3c 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -308,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
PairMap: nil,
Jwt: auth,
}
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return uploaderErr
+ }
+
var uploadErr error
- uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption)
+ uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 6cf7d65b1..f8d129bf3 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
PairMap: pu.PairMap,
Jwt: auth,
}
- uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption)
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
+ }
+
+ uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 8c8eba078..d0d1575cf 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
PairMap: pairMap,
Jwt: auth,
}
- uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return nil, err, []byte{}
+ }
+
+ uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 014bdb7f8..65fa622e7 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -30,6 +30,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -256,7 +257,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
}
director(req)
}
- proxy.Transport = util.Transport
+ proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
proxy.ServeHTTP(w, r)
}
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 5e3e42dea..7479b5535 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -113,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
location := ms.findVolumeLocation(collection, vid)
if location.Error == "" {
loc := location.Locations[rand.Intn(len(location.Locations))]
- var url string
+ url, _ := util_http.NormalizeUrl(loc.PublicUrl)
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = url + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
+ url = url + r.URL.Path
}
http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 64254b3b8..4452e019b 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
PairMap: nil,
Jwt: security.EncodedJwt(req.Auth),
}
- if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
- if err == nil {
- err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
- }
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
+ return
+ }
+
+ if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
}
}(replica.Url)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index ccbd42054..15d639f49 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -27,6 +27,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
@@ -81,7 +82,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
if vs.ReadMode == "proxy" {
// proxy client request to target server
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url)
+ u, _ := url.Parse(rawURL)
r.URL.Host = u.Host
r.URL.Scheme = u.Scheme
request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
@@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- response, err := client.Do(request)
+ response, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
- defer util.CloseResponse(response)
+ defer util_http.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
for _, v := range vv {
@@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
} else {
// redirect
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl)
+ u, _ := url.Parse(rawURL)
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 97d51dad7..f8d964552 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -392,8 +392,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
}
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
+ return nil, fmt.Errorf("upload data: %v", uploaderErr)
+ }
- fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
f.fs,
&filer_pb.AssignVolumeRequest{
Count: 1,