aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
authorvadimartynov <166398828+vadimartynov@users.noreply.github.com>2024-07-17 09:14:09 +0300
committerGitHub <noreply@github.com>2024-07-16 23:14:09 -0700
commit86d92a42b4861d4bb05c58fea9db84d960995545 (patch)
treeb3b8cefc07fe3d10c0dc0c69120a9019584bd60a /weed/operation
parentc6dec11ea556b8be648f372dfa5cbd074c9f631b (diff)
downloadseaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.tar.xz
seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.zip
Added tls for http clients (#5766)
* Added global http client * Added Do func for global http client * Changed the code to use the global http client * Fix http client in volume uploader * Fixed pkg name * Fixed http util funcs * Fixed http client for bench_filer_upload * Fixed http client for stress_filer_upload * Fixed http client for filer_server_handlers_proxy * Fixed http client for command_fs_merge_volumes * Fixed http client for command_fs_merge_volumes and command_volume_fsck * Fixed http client for s3api_server * Added init global client for main funcs * Rename global_client to client * Changed: - fixed NewHttpClient; - added CheckIsHttpsClientEnabled func - updated security.toml in scaffold * Reduce the visibility of some functions in the util/http/client pkg * Added the loadSecurityConfig function * Use util.LoadSecurityConfiguration() in NewHttpClient func
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/needle_parse_test.go16
-rw-r--r--weed/operation/submit.go24
-rw-r--r--weed/operation/upload_content.go84
4 files changed, 81 insertions, 48 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 02faf9904..50313a670 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -15,6 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
}
- resp, err := util.Do(req)
+ resp, err := util_http.Do(req)
if err != nil {
return written, err
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
switch resp.StatusCode {
case http.StatusRequestedRangeNotSatisfiable:
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index 07b0153a9..b4bac5976 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size.
*/
func TestCreateNeedleFromRequest(t *testing.T) {
- mc := &MockClient{}
- tmp := HttpClient
- HttpClient = mc
- defer func() {
- HttpClient = tmp
- }()
+ mockClient := &MockClient{}
+ uploader := newUploader(mockClient)
{
- mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption)
+ uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
}
{
- mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) {
assert.Equal(t, nil, err, "upload: %v", err)
assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
@@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- Upload(bytes.NewReader(gzippedData), uploadOption)
+ uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
}
/*
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 57bd81b14..516478dbe 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
PairMap: nil,
Jwt: jwt,
}
- ret, e, _ := Upload(fi.Reader, uploadOption)
+
+ uploader, e := NewUploader()
+ if e != nil {
+ return 0, e
+ }
+
+ ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
PairMap: nil,
Jwt: jwt,
}
- uploadResult, uploadError, _ := Upload(reader, uploadOption)
+
+ uploader, uploaderError := NewUploader()
+ if uploaderError != nil {
+ return 0, uploaderError
+ }
+
+ uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
PairMap: nil,
Jwt: jwt,
}
- _, e = UploadData(buf, uploadOption)
+
+ uploader, e := NewUploader()
+ if e != nil {
+ return e
+ }
+
+ _, e = uploader.UploadData(buf, uploadOption)
return e
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 6c6aec1b5..8b223e769 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -9,7 +9,7 @@ import (
"io"
"mime"
"mime/multipart"
- "net"
+ "sync"
"net/http"
"net/textproto"
"path/filepath"
@@ -21,6 +21,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
)
type UploadOption struct {
@@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN
}
}
+var (
+ fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
+ uploader *Uploader
+ uploaderErr error
+ once sync.Once
+)
+
// HTTPClient interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
-var (
- HttpClient HTTPClient
-)
+// Uploader
+type Uploader struct {
+ httpClient HTTPClient
+}
-func init() {
- HttpClient = &http.Client{Transport: &http.Transport{
- DialContext: (&net.Dialer{
- Timeout: 10 * time.Second,
- KeepAlive: 10 * time.Second,
- }).DialContext,
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
+func NewUploader() (*Uploader, error) {
+ once.Do(func () {
+ // With Dial context
+ var httpClient *util_http_client.HTTPClient
+ httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
+ if uploaderErr != nil {
+ uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
+ }
+ if httpClient != nil {
+ uploader = newUploader(httpClient)
+ }
+ })
+ return uploader, uploaderErr
+}
+
+func newUploader(httpClient HTTPClient) (*Uploader) {
+ return &Uploader{
+ httpClient: httpClient,
+ }
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
-func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
doUploadFunc := func() error {
var host string
@@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
uploadOption.Jwt = auth
var uploadErr error
- uploadResult, uploadErr, data = doUpload(reader, uploadOption)
+ uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A
return
}
-var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
-
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
- uploadResult, err = retriedUploadData(data, option)
+func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = uploader.retriedUploadData(data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = doUpload(reader, option)
+func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = uploader.doUpload(reader, option)
return
}
-func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul
return
}
}
- uploadResult, uploadErr := retriedUploadData(data, option)
+ uploadResult, uploadErr := uploader.retriedUploadData(data, option)
return uploadResult, uploadErr, data
}
-func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
- uploadResult, err = doUploadData(data, option)
+ uploadResult, err = uploader.doUploadData(data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR
return
}
-func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
}
// upload data
- uploadResult, err = upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
uploadResult.Size = uint32(clearDataLen)
} else {
// upload data
- uploadResult, err = upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
return uploadResult, err
}
-func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
+func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
// print("+")
- resp, post_err := HttpClient.Do(req)
- defer util.CloseResponse(resp)
+ resp, post_err := uploader.httpClient.Do(req)
+ defer util_http.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
- resp, post_err = HttpClient.Do(req)
- defer util.CloseResponse(resp)
+ resp, post_err = uploader.httpClient.Do(req)
+ defer util_http.CloseResponse(resp)
}
}
if post_err != nil {