diff options
| author | Aleksey Kosov <rusyak777@list.ru> | 2025-05-28 21:34:02 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-05-28 11:34:02 -0700 |
| commit | 283d9e0079d5deb57aefe9a7b30e8b9869ba8685 (patch) | |
| tree | 87b09bebed2ee4afc9c2a4f711ac8598fe2949b7 /weed/operation | |
| parent | 62aaaa18f3ea8b7600d28934580dc220ca95164a (diff) | |
| download | seaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.tar.xz seaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.zip | |
Add context with request (#6824)
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 6 | ||||
| -rw-r--r-- | weed/operation/assign_file_id_test.go | 2 | ||||
| -rw-r--r-- | weed/operation/needle_parse_test.go | 5 | ||||
| -rw-r--r-- | weed/operation/submit.go | 12 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 29 |
5 files changed, 29 insertions, 25 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 13418da1a..eb54c674b 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -139,7 +139,7 @@ func (ap *singleThreadAssignProxy) doAssign(grpcConnection *grpc.ClientConn, pri return } -func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(ctx context.Context, masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(ctx), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, @@ -165,7 +165,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest DataNode: request.DataNode, WritableVolumeCount: request.WritableVolumeCount, } - resp, grpcErr := masterClient.Assign(context.Background(), req) + resp, grpcErr := masterClient.Assign(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go index ac0f4eee6..b2ec7d92a 100644 --- a/weed/operation/assign_file_id_test.go +++ b/weed/operation/assign_file_id_test.go @@ -60,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) { func BenchmarkUnaryAssign(b *testing.B) { for i := 0; i < b.N; i++ { - Assign(func(_ context.Context) pb.ServerAddress { + Assign(context.Background(), func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), &VolumeAssignRequest{ Count: 1, diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go index 7526a6e79..339d4507e 100644 --- a/weed/operation/needle_parse_test.go +++ b/weed/operation/needle_parse_test.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "context" "fmt" "io" "net/http" @@ -58,7 +59,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploadResult, err, data := uploader.Upload(bytes.NewReader([]byte(textContent)), uploadOption) + uploadResult, err, data := uploader.Upload(context.Background(), bytes.NewReader([]byte(textContent)), uploadOption) if len(data) != len(textContent) { t.Errorf("data actual %d expected %d", len(data), len(textContent)) } @@ -86,7 +87,7 @@ func TestCreateNeedleFromRequest(t *testing.T) { PairMap: nil, Jwt: "", } - uploader.Upload(bytes.NewReader(gzippedData), uploadOption) + uploader.Upload(context.Background(), bytes.NewReader(gzippedData), uploadOption) } /* diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 9470afced..1efa42b2f 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -62,7 +62,7 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []* Ttl: pref.Ttl, DiskType: pref.DiskType, } - ret, err := Assign(masterFn, grpcDialOption, ar) + ret, err := Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { for index := range files { results[index].Error = err.Error() @@ -155,7 +155,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j Ttl: fi.Pref.Ttl, DiskType: fi.Pref.DiskType, } - ret, err = Assign(masterFn, grpcDialOption, ar) + ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { return } @@ -169,7 +169,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j Ttl: fi.Pref.Ttl, DiskType: fi.Pref.DiskType, } - ret, err = Assign(masterFn, grpcDialOption, ar) + ret, err = Assign(context.Background(), masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) @@ -223,7 +223,7 @@ func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, j return 0, e } - ret, e, _ := uploader.Upload(fi.Reader, uploadOption) + ret, e, _ := uploader.Upload(context.Background(), fi.Reader, uploadOption) if e != nil { return 0, e } @@ -267,7 +267,7 @@ func uploadOneChunk(filename string, reader io.Reader, masterFn GetMasterFn, return 0, uploaderError } - uploadResult, uploadError, _ := uploader.Upload(reader, uploadOption) + uploadResult, uploadError, _ := uploader.Upload(context.Background(), reader, uploadOption) if uploadError != nil { return 0, uploadError } @@ -299,6 +299,6 @@ func uploadChunkedFileManifest(fileUrl string, manifest *ChunkManifest, jwt secu return e } - _, e = uploader.UploadData(buf, uploadOption) + _, e = uploader.UploadData(context.Background(), buf, uploadOption) return e } diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 0cf6bf7cf..6d910674b 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -134,7 +134,7 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi uploadOption.Jwt = auth var uploadErr error - uploadResult, uploadErr, data = uploader.doUpload(reader, uploadOption) + uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption) return uploadErr } if uploadOption.RetryForever { @@ -151,18 +151,18 @@ func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assi } // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func (uploader *Uploader) UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { - uploadResult, err = uploader.retriedUploadData(data, option) +func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = uploader.retriedUploadData(ctx, data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func (uploader *Uploader) Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = uploader.doUpload(reader, option) +func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = uploader.doUpload(ctx, reader, option) return } -func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { +func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes @@ -173,16 +173,16 @@ func (uploader *Uploader) doUpload(reader io.Reader, option *UploadOption) (uplo return } } - uploadResult, uploadErr := uploader.retriedUploadData(data, option) + uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option) return uploadResult, uploadErr, data } -func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { if i > 0 { time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } - uploadResult, err = uploader.doUploadData(data, option) + uploadResult, err = uploader.doUploadData(ctx, data, option) if err == nil { uploadResult.RetryCount = i return @@ -192,7 +192,7 @@ func (uploader *Uploader) retriedUploadData(data []byte, option *UploadOption) ( return } -func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { +func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { contentIsGzipped := option.IsInputCompressed shouldGzipNow := false if !option.IsInputCompressed { @@ -248,7 +248,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return }, len(encryptedData), &UploadOption{ @@ -272,7 +272,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa } } else { // upload data - uploadResult, err = uploader.upload_content(func(w io.Writer) (err error) { + uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) { _, err = w.Write(data) return }, len(data), &UploadOption{ @@ -298,7 +298,7 @@ func (uploader *Uploader) doUploadData(data []byte, option *UploadOption) (uploa return uploadResult, err } -func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { +func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { var body_writer *multipart.Writer var reqReader *bytes.Reader var buf *bytebufferpool.ByteBuffer @@ -358,6 +358,9 @@ func (uploader *Uploader) upload_content(fillBufferFunction func(w io.Writer) er if option.Jwt != "" { req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } + + util.ReqWithRequestId(req, ctx) + // print("+") resp, post_err := uploader.httpClient.Do(req) defer util_http.CloseResponse(resp) |
