diff options
| author | vadimartynov <166398828+vadimartynov@users.noreply.github.com> | 2024-07-17 09:14:09 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-16 23:14:09 -0700 |
| commit | 86d92a42b4861d4bb05c58fea9db84d960995545 (patch) | |
| tree | b3b8cefc07fe3d10c0dc0c69120a9019584bd60a /weed/operation/upload_content.go | |
| parent | c6dec11ea556b8be648f372dfa5cbd074c9f631b (diff) | |
| download | seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.tar.xz seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.zip | |
Added tls for http clients (#5766)
* Added global http client
* Added Do func for global http client
* Changed the code to use the global http client
* Fix http client in volume uploader
* Fixed pkg name
* Fixed http util funcs
* Fixed http client for bench_filer_upload
* Fixed http client for stress_filer_upload
* Fixed http client for filer_server_handlers_proxy
* Fixed http client for command_fs_merge_volumes
* Fixed http client for command_fs_merge_volumes and command_volume_fsck
* Fixed http client for s3api_server
* Added init global client for main funcs
* Rename global_client to client
* Changed:
- fixed NewHttpClient;
- added CheckIsHttpsClientEnabled func
- updated security.toml in scaffold
* Reduce the visibility of some functions in the util/http/client pkg
* Added the loadSecurityConfig function
* Use util.LoadSecurityConfiguration() in NewHttpClient func
Diffstat (limited to 'weed/operation/upload_content.go')
| -rw-r--r-- | weed/operation/upload_content.go | 84 |
1 files changed, 51 insertions, 33 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 6c6aec1b5..8b223e769 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -9,7 +9,7 @@ import ( "io" "mime" "mime/multipart" - "net" + "sync" "net/http" "net/textproto" "path/filepath" @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" ) type UploadOption struct { @@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN } } +var ( + fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") + uploader *Uploader + uploaderErr error + once sync.Once +) + // HTTPClient interface for testing type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -var ( - HttpClient HTTPClient -) +// Uploader +type Uploader struct { + httpClient HTTPClient +} -func init() { - HttpClient = &http.Client{Transport: &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} +func NewUploader() (*Uploader, error) { + once.Do(func () { + // With Dial context + var httpClient *util_http_client.HTTPClient + httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext) + if uploaderErr != nil { + uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr) + } + if httpClient != nil { + uploader = newUploader(httpClient) + } + }) + return uploader, uploaderErr +} + +func newUploader(httpClient HTTPClient) (*Uploader) { + return &Uploader{ + httpClient: httpClient, + } } // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. -func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { doUploadFunc := func() error { var host string @@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A return } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") - // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(data, option) +func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(reader, option) +func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(reader, option) return } -func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul return } } - uploadResult, uploadErr := retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = doUploadData(data, option) + uploadResult, err = uploader.doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return @@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR return } -func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult } // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult return uploadResult, err } -func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") - resp, post_err := HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err := uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() - resp, post_err = HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err = uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) } } if post_err != nil { |
