diff options
| author | vadimartynov <166398828+vadimartynov@users.noreply.github.com> | 2024-07-17 09:14:09 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-16 23:14:09 -0700 |
| commit | 86d92a42b4861d4bb05c58fea9db84d960995545 (patch) | |
| tree | b3b8cefc07fe3d10c0dc0c69120a9019584bd60a /weed/server | |
| parent | c6dec11ea556b8be648f372dfa5cbd074c9f631b (diff) | |
| download | seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.tar.xz seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.zip | |
Added tls for http clients (#5766)
* Added global http client
* Added Do func for global http client
* Changed the code to use the global http client
* Fix http client in volume uploader
* Fixed pkg name
* Fixed http util funcs
* Fixed http client for bench_filer_upload
* Fixed http client for stress_filer_upload
* Fixed http client for filer_server_handlers_proxy
* Fixed http client for command_fs_merge_volumes
* Fixed http client for command_fs_merge_volumes and command_volume_fsck
* Fixed http client for s3api_server
* Added init global client for main funcs
* Rename global_client to client
* Changed:
- fixed NewHttpClient;
- added CheckIsHttpsClientEnabled func
- updated security.toml in scaffold
* Reduce the visibility of some functions in the util/http/client pkg
* Added the loadSecurityConfig function
* Use util.LoadSecurityConfiguration() in NewHttpClient func
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_proxy.go | 17 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 8 | ||||
| -rw-r--r-- | weed/server/master_server.go | 3 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_grpc_remote.go | 13 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 11 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 7 |
11 files changed, 60 insertions, 32 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 7be2f8a76..e6f6cdb88 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope PairMap: pu.PairMap, Jwt: assignResult.Auth, } - uploadResult, err := operation.UploadData(pu.Data, uploadOption) + uploader, err := operation.NewUploader() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + uploadResult, err := uploader.UploadData(pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index e04994569..c1a26ca11 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,24 +3,13 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "math/rand" "net/http" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -var ( - client *http.Client -) - -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} -} - func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) @@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques } } - proxyResponse, postErr := client.Do(proxyReq) + proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) w.WriteHeader(http.StatusInternalServerError) return } - defer util.CloseResponse(proxyResponse) + defer util_http.CloseResponse(proxyResponse) for k, v := range proxyResponse.Header { w.Header()[k] = v diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index b186fd34e..ab3988f8c 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte fs.autoChunk(ctx, w, r, contentLength, so) } - util.CloseRequest(r) + util_http.CloseRequest(r) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 029fbb7c9..1c7ed0c3c 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -308,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs PairMap: nil, Jwt: auth, } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return uploaderErr + } + var uploadErr error - uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption) + uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption) if uploadErr != nil { return uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6cf7d65b1..f8d129bf3 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht PairMap: pu.PairMap, Jwt: auth, } - uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption) + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) + } + + uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 8c8eba078..d0d1575cf 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil PairMap: pairMap, Jwt: auth, } - uploadResult, err, data := operation.Upload(limitedReader, uploadOption) + + uploader, err := operation.NewUploader() + if err != nil { + return nil, err, []byte{} + } + + uploadResult, err, data := uploader.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 014bdb7f8..65fa622e7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -30,6 +30,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) const ( @@ -256,7 +257,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } director(req) } - proxy.Transport = util.Transport + proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport() proxy.ServeHTTP(w, r) } } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 5e3e42dea..7479b5535 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -113,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) location := ms.findVolumeLocation(collection, vid) if location.Error == "" { loc := location.Locations[rand.Intn(len(location.Locations))] - var url string + url, _ := util_http.NormalizeUrl(loc.PublicUrl) if r.URL.RawQuery != "" { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + url = url + r.URL.Path + "?" + r.URL.RawQuery } else { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + url = url + r.URL.Path } http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 64254b3b8..4452e019b 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser PairMap: nil, Jwt: security.EncodedJwt(req.Auth), } - if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { - if err == nil { - err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) - } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr) + return + } + + if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) } }(replica.Url) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index ccbd42054..15d639f49 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -27,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) @@ -81,7 +82,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } if vs.ReadMode == "proxy" { // proxy client request to target server - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url) + u, _ := url.Parse(rawURL) r.URL.Host = u.Host r.URL.Scheme = u.Scheme request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) @@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - response, err := client.Do(request) + response, err := util_http.GetGlobalHttpClient().Do(request) if err != nil { glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) InternalError(w) return } - defer util.CloseResponse(response) + defer util_http.CloseResponse(response) // proxy target response to client for k, vv := range response.Header { for _, v := range vv { @@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } else { // redirect - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl) + u, _ := url.Parse(rawURL) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} if c := r.FormValue("collection"); c != "" { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 97d51dad7..f8d964552 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -392,8 +392,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, } func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr) + return nil, fmt.Errorf("upload data: %v", uploaderErr) + } - fileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( f.fs, &filer_pb.AssignVolumeRequest{ Count: 1, |
