diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/chunked_file.go | 5 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 16 | ||||
| -rw-r--r-- | weed/operation/submit.go | 24 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 84 |
4 files changed, 81 insertions, 48 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 02faf9904..50313a670 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) } - resp, err := util.Do(req) + resp, err := util_http.Do(req) if err != nil { return written, err } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) switch resp.StatusCode { case http.StatusRequestedRangeNotSatisfiable: diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 07b0153a9..b4bac5976 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size. */ func TestCreateNeedleFromRequest(t *testing.T) { - mc := &MockClient{} - tmp := HttpClient - HttpClient = mc - defer func() { - HttpClient = tmp - }() + mockClient := &MockClient{} + uploader := newUploader(mockClient) { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption) + uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { } { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - Upload(bytes.NewReader(gzippedData), uploadOption) + uploader.Upload(bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 57bd81b14..516478dbe 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw PairMap: nil, Jwt: jwt, } - ret, e, _ := Upload(fi.Reader, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return 0, e + } + + ret, e, _ := uploader.Upload(fi.Reader, uploadOption) if e != nil { return 0, e } @@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, PairMap: nil, Jwt: jwt, } - uploadResult, uploadError, _ := Upload(reader, uploadOption) + + uploader, uploaderError := NewUploader() + if uploaderError != nil { + return 0, uploaderError + } + + uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s PairMap: nil, Jwt: jwt, } - _, e = UploadData(buf, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return e + } + + _, e = uploader.UploadData(buf, uploadOption) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 6c6aec1b5..8b223e769 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -9,7 +9,7 @@ import ( "io" "mime" "mime/multipart" - "net" + "sync" "net/http" "net/textproto" "path/filepath" @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" ) type UploadOption struct { @@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN } } +var ( + fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") + uploader *Uploader + uploaderErr error + once sync.Once +) + // HTTPClient interface for testing type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -var ( - HttpClient HTTPClient -) +// Uploader +type Uploader struct { + httpClient HTTPClient +} -func init() { - HttpClient = &http.Client{Transport: &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} +func NewUploader() (*Uploader, error) { + once.Do(func () { + // With Dial context + var httpClient *util_http_client.HTTPClient + httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext) + if uploaderErr != nil { + uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr) + } + if httpClient != nil { + uploader = newUploader(httpClient) + } + }) + return uploader, uploaderErr +} + +func newUploader(httpClient HTTPClient) (*Uploader) { + return &Uploader{ + httpClient: httpClient, + } } // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. -func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { doUploadFunc := func() error { var host string @@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A return } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") - // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(data, option) +func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(reader, option) +func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(reader, option) return } -func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul return } } - uploadResult, uploadErr := retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = doUploadData(data, option) + uploadResult, err = uploader.doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return @@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR return } -func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult } // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult return uploadResult, err } -func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") - resp, post_err := HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err := uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() - resp, post_err = HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err = uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) } } if post_err != nil { |
