aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go6
-rw-r--r--weed/operation/assign_file_id_test.go7
-rw-r--r--weed/operation/chunked_file.go3
-rw-r--r--weed/operation/lookup.go2
-rw-r--r--weed/operation/submit.go3
5 files changed, 12 insertions, 9 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 1b7a0146d..cc8e87b21 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -47,9 +47,9 @@ func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concur
ap = &AssignProxy{
pool: make(chan *singleThreadAssignProxy, concurrency),
}
- ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
+ ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption)
if err != nil {
- return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
+ return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err)
}
for i := 0; i < concurrency; i++ {
ap.pool <- &singleThreadAssignProxy{}
@@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go
index f6362dceb..ac0f4eee6 100644
--- a/weed/operation/assign_file_id_test.go
+++ b/weed/operation/assign_file_id_test.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@@ -11,7 +12,7 @@ import (
func BenchmarkWithConcurrency(b *testing.B) {
concurrencyLevels := []int{1, 10, 100, 1000}
- ap, _ := NewAssignProxy(func() pb.ServerAddress {
+ ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
@@ -47,7 +48,7 @@ func BenchmarkWithConcurrency(b *testing.B) {
}
func BenchmarkStreamAssign(b *testing.B) {
- ap, _ := NewAssignProxy(func() pb.ServerAddress {
+ ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
for i := 0; i < b.N; i++ {
@@ -59,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
- Assign(func() pb.ServerAddress {
+ Assign(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index eacf64112..c451420fe 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -173,7 +174,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
+ fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress {
return cf.master
}, cf.grpcDialOption, ci.Fid)
if lookupError != nil {
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index fc4609a2d..6c89c17b1 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -80,7 +80,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
- err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 3eb38c31e..57bd81b14 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"mime"
@@ -40,7 +41,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-type GetMasterFn func() pb.ServerAddress
+type GetMasterFn func(ctx context.Context) pb.ServerAddress
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))