aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go6
-rw-r--r--weed/operation/assign_file_id_test.go2
-rw-r--r--weed/operation/needle_parse_test.go5
-rw-r--r--weed/operation/submit.go12
-rw-r--r--weed/operation/upload_content.go29
5 files changed, 29 insertions, 25 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 13418da1a..eb54c674b 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri
return
}
-func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
@@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
DataNode: request.DataNode,
WritableVolumeCount: request.WritableVolumeCount,
}
- resp, grpcErr := masterClient.Assign(context.Background(), req)
+ resp, grpcErr := masterClient.Assign(ctx, req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go
index ac0f4eee6..b2ec7d92a 100644
--- a/weed/operation/assign_file_id_test.go
+++ b/weed/operation/assign_file_id_test.go
@@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
- Assign(func(_ context.Context) pb.ServerAddress {
+ Assign(context.Background(), func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
index 7526a6e79..339d4507e 100644
--- a/weed/operation/needle_parse_test.go
+++ b/weed/operation/needle_parse_test.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "context"
"fmt"
"io"
"net/http"
@@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption)
+ uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption)
if len(data) != len(textContent) {
t.Errorf("data actual %d expected %d", len(data), len(textContent))
}
@@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) {
PairMap: nil,
Jwt: "",
}
- uploader.Upload(bytes.NewReader(gzippedData), uploadOption)
+ uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption)
}
/*
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 9470afced..1efa42b2f 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*
Ttl: pref.Ttl,
DiskType: pref.DiskType,
}
- ret, err := Assign(masterFn, grpcDialOption, ar)
+ ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
for index := range files {
results[index].Error = err.Error()
@@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType,
}
- ret, err = Assign(masterFn, grpcDialOption, ar)
+ ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
return
}
@@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
Ttl: fi.Pref.Ttl,
DiskType: fi.Pref.DiskType,
}
- ret, err = Assign(masterFn, grpcDialOption, ar)
+ ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
@@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j
return 0, e
}
- ret, e, _ := uploader.Upload(fi.Reader, uploadOption)
+ ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption)
if e != nil {
return 0, e
}
@@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn,
return 0, uploaderError
}
- uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption)
+ uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption)
if uploadError != nil {
return 0, uploadError
}
@@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu
return e
}
- _, e = uploader.UploadData(buf, uploadOption)
+ _, e = uploader.UploadData(context.Background(), buf, uploadOption)
return e
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 0cf6bf7cf..6d910674b 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
uploadOption.Jwt = auth
var uploadErr error
- uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption)
+ uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
@@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi
}
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
- uploadResult, err = uploader.retriedUploadData(data, option)
+func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+ uploadResult, err = uploader.retriedUploadData(ctx, data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
- uploadResult, err, data = uploader.doUpload(reader, option)
+func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = uploader.doUpload(ctx, reader, option)
return
}
-func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
+func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
@@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo
return
}
}
- uploadResult, uploadErr := uploader.retriedUploadData(data, option)
+ uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
return uploadResult, uploadErr, data
}
-func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
- uploadResult, err = uploader.doUploadData(data, option)
+ uploadResult, err = uploader.doUploadData(ctx, data, option)
if err == nil {
uploadResult.RetryCount = i
return
@@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (
return
}
-func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
+func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
@@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
@@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
}
} else {
// upload data
- uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) {
+ uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
@@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa
return uploadResult, err
}
-func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
+func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
@@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er
if option.Jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
+
+ util.ReqWithRequestId(req, ctx)
+
// print("+")
resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)