aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/upload_content.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/upload_content.go')
-rw-r--r--weed/operation/upload_content.go29
1 files changed, 16 insertions, 13 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 0cf6bf7cf..6d910674b 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
uploadOption.Jwt = auth
var uploadErr error
- uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
+ uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
}
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
- uploadResult, err = uploader.retriedUploadData(data, option)
+func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = uploader.retriedUploadData(ctx, data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = uploader.doUpload(reader, option)
+func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = uploader.doUpload(ctx, reader, option)
return
}
-func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo
return
}
}
- uploadResult, uploadErr := uploader.retriedUploadData(data, option)
+ uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
return uploadResult, uploadErr, data
}
-func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
- uploadResult, err = uploader.doUploadData(data, option)
+ uploadResult, err = uploader.doUploadData(ctx, data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (
return
}
-func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
} else {
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
return uploadResult, err
}
-func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
+func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er
if option.Jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
+
+ util.ReqWithRequestId(req, ctx)
+
// print("+")
resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)