diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 6 | ||||
| -rw-r--r-- | weed/operation/assign_file_id_test.go | 7 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 3 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 2 | ||||
| -rw-r--r-- | weed/operation/submit.go | 3 |
5 files changed, 12 insertions, 9 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 1b7a0146d..cc8e87b21 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -47,9 +47,9 @@ func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concur ap = &AssignProxy{ pool: make(chan *singleThreadAssignProxy, concurrency), } - ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption) + ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption) if err != nil { - return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err) + return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err) } for i := 0; i < concurrency; i++ { ap.pool <- &singleThreadAssignProxy{} @@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest continue } - lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: request.Count, Replication: request.Replication, diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go index f6362dceb..ac0f4eee6 100644 --- a/weed/operation/assign_file_id_test.go +++ b/weed/operation/assign_file_id_test.go @@ -1,6 +1,7 @@ package operation import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" @@ -11,7 +12,7 @@ import ( func BenchmarkWithConcurrency(b *testing.B) { concurrencyLevels := []int{1, 10, 100, 1000} - ap, _ := NewAssignProxy(func() pb.ServerAddress { + ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), 16) @@ -47,7 +48,7 @@ func BenchmarkWithConcurrency(b *testing.B) { } func BenchmarkStreamAssign(b *testing.B) { - ap, _ := NewAssignProxy(func() pb.ServerAddress { + ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), 16) for i := 0; i < b.N; i++ { @@ -59,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) { func BenchmarkUnaryAssign(b *testing.B) { for i := 0; i < b.N; i++ { - Assign(func() pb.ServerAddress { + Assign(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress("localhost:9333") }, grpc.WithInsecure(), &VolumeAssignRequest{ Count: 1, diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index eacf64112..c451420fe 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -1,6 +1,7 @@ package operation import ( + "context" "encoding/json" "errors" "fmt" @@ -173,7 +174,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress { + fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress { return cf.master }, cf.grpcDialOption, ci.Fid) if lookupError != nil { diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index fc4609a2d..6c89c17b1 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -80,7 +80,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids //only query unknown_vids - err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeOrFileIds: unknown_vids, diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 3eb38c31e..57bd81b14 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,6 +1,7 @@ package operation import ( + "context" "github.com/seaweedfs/seaweedfs/weed/pb" "io" "mime" @@ -40,7 +41,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -type GetMasterFn func() pb.ServerAddress +type GetMasterFn func(ctx context.Context) pb.ServerAddress func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) |
