diff options
Diffstat (limited to 'weed/operation/upload_content.go')
| -rw-r--r-- | weed/operation/upload_content.go | 29 |
1 files changed, 16 insertions, 13 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0cf6bf7cf..6d910674b 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi } // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = uploader.retriedUploadData(data, option) +func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(ctx, data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = uploader.doUpload(reader, option) +func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(ctx, reader, option) return } -func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo return } } - uploadResult, uploadErr := uploader.retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option) return uploadResult, uploadErr, data } -func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = uploader.doUploadData(data, option) + uploadResult, err = uploader.doUploadData(ctx, data, option) if err == nil { uploadResult.RetryCount = i return @@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) ( return } -func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } } else { // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa return uploadResult, err } -func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er if option.Jwt != "" { req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } + + util.ReqWithRequestId(req, ctx) + // print("+") resp, post_err := uploader.httpClient.Do(req) defer util_http.CloseResponse(resp) |
