diff options
| author | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
| commit | 7438648d1cfacd5ca570dd029d1bdb5fd271bd70 (patch) | |
| tree | cf12b49473be0373cb03d83470ddc75708454171 /weed/operation | |
| parent | 49893267e978cc3fda00dc991e00099742fb5a9d (diff) | |
| parent | 63c707f9c1b4dc469ec39c446563c324ce4ccb6f (diff) | |
| download | seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.tar.xz seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.zip | |
Merge branch 'master' into mq
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/chunked_file.go | 9 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 14 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 16 | ||||
| -rw-r--r-- | weed/operation/submit.go | 24 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 86 |
5 files changed, 91 insertions, 58 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index c451420fe..be3e5c98e 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -15,6 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -79,7 +80,7 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds) + results, err := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) @@ -95,7 +96,7 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g } func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) { - req, err := http.NewRequest("GET", fileUrl, nil) + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) if err != nil { return written, err } @@ -103,11 +104,11 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) } - resp, err := util.Do(req) + resp, err := util_http.Do(req) if err != nil { return written, err } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) switch resp.StatusCode { case http.StatusRequestedRangeNotSatisfiable: diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index cee80fb47..419223165 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -28,8 +28,8 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { return fid[:commaIndex], fid[commaIndex+1:], nil } -// DeleteFiles batch deletes a list of fileIds -func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { +// DeleteFileIds batch deletes a list of fileIds +func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { lookupFunc := func(vids []string) (results map[string]*LookupResult, err error) { results, err = LookupVolumeIds(masterFn, grpcDialOption, vids) @@ -43,11 +43,11 @@ func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.Di return } - return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) + return DeleteFileIdsWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) } -func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { var ret []*volume_server_pb.DeleteResult @@ -102,7 +102,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str go func(server pb.ServerAddress, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil { + if deleteResults, deleteErr := DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil { err = deleteErr } else if deleteResults != nil { resultChan <- deleteResults @@ -120,8 +120,8 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str return ret, err } -// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { +// DeleteFileIdsAtOneVolumeServer deletes a list of files that is on one volume server via gRpc +func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 07b0153a9..b4bac5976 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -38,15 +38,11 @@ If the content is already compressed, need to know the content size. */ func TestCreateNeedleFromRequest(t *testing.T) { - mc := &MockClient{} - tmp := HttpClient - HttpClient = mc - defer func() { - HttpClient = tmp - }() + mockClient := &MockClient{} + uploader := newUploader(mockClient) { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -62,7 +58,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploadResult, err, data := Upload(bytes.NewReader([]byte(textContent)), uploadOption) + uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -73,7 +69,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { } { - mc.needleHandling = func(n *needle.Needle, originalSize int, err error) { + mockClient.needleHandling = func(n *needle.Needle, originalSize int, err error) { assert.Equal(t, nil, err, "upload: %v", err) assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime)) assert.Equal(t, true, n.IsCompressed(), "this should be compressed") @@ -90,7 +86,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - Upload(bytes.NewReader(gzippedData), uploadOption) + uploader.Upload(bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 57bd81b14..516478dbe 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -217,7 +217,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw PairMap: nil, Jwt: jwt, } - ret, e, _ := Upload(fi.Reader, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return 0, e + } + + ret, e, _ := uploader.Upload(fi.Reader, uploadOption) if e != nil { return 0, e } @@ -239,7 +245,13 @@ func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, PairMap: nil, Jwt: jwt, } - uploadResult, uploadError, _ := Upload(reader, uploadOption) + + uploader, uploaderError := NewUploader() + if uploaderError != nil { + return 0, uploaderError + } + + uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -265,6 +277,12 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s PairMap: nil, Jwt: jwt, } - _, e = UploadData(buf, uploadOption) + + uploader, e := NewUploader() + if e != nil { + return e + } + + _, e = uploader.UploadData(buf, uploadOption) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a1df07d7e..8b223e769 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -9,7 +9,7 @@ import ( "io" "mime" "mime/multipart" - "net" + "sync" "net/http" "net/textproto" "path/filepath" @@ -21,6 +21,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" ) type UploadOption struct { @@ -62,29 +64,47 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsN } } +var ( + fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") + uploader *Uploader + uploaderErr error + once sync.Once +) + // HTTPClient interface for testing type HTTPClient interface { Do(req *http.Request) (*http.Response, error) } -var ( - HttpClient HTTPClient -) +// Uploader +type Uploader struct { + httpClient HTTPClient +} -func init() { - HttpClient = &http.Client{Transport: &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: 10 * time.Second, - KeepAlive: 10 * time.Second, - }).DialContext, - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} +func NewUploader() (*Uploader, error) { + once.Do(func () { + // With Dial context + var httpClient *util_http_client.HTTPClient + httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext) + if uploaderErr != nil { + uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr) + } + if httpClient != nil { + uploader = newUploader(httpClient) + } + }) + return uploader, uploaderErr +} + +func newUploader(httpClient HTTPClient) (*Uploader) { + return &Uploader{ + httpClient: httpClient, + } } // UploadWithRetry will retry both assigning volume request and uploading content // The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume. -func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) { doUploadFunc := func() error { var host string @@ -114,7 +134,7 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -130,21 +150,19 @@ func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.A return } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") - // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(data, option) +func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(reader, option) +func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(reader, option) return } -func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -155,16 +173,16 @@ func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResul return } } - uploadResult, uploadErr := retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = doUploadData(data, option) + uploadResult, err = uploader.doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return @@ -174,7 +192,7 @@ func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadR return } -func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -230,7 +248,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult } // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -251,7 +269,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -277,7 +295,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult return uploadResult, err } -func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -325,7 +343,7 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize } else { reqReader = bytes.NewReader(option.BytesBuffer.Bytes()) } - req, postErr := http.NewRequest("POST", option.UploadUrl, reqReader) + req, postErr := http.NewRequest(http.MethodPost, option.UploadUrl, reqReader) if postErr != nil { glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr) return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr) @@ -338,15 +356,15 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") - resp, post_err := HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err := uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc() - resp, post_err = HttpClient.Do(req) - defer util.CloseResponse(resp) + resp, post_err = uploader.httpClient.Do(req) + defer util_http.CloseResponse(resp) } } if post_err != nil { |
