aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go2
-rw-r--r--weed/operation/delete_content.go2
-rw-r--r--weed/operation/grpc_client.go12
-rw-r--r--weed/operation/lookup.go4
-rw-r--r--weed/operation/stats.go4
-rw-r--r--weed/operation/sync_volume.go4
-rw-r--r--weed/operation/tail_volume.go4
7 files changed, 16 insertions, 16 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 2dfa44483..b67d8b708 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -44,7 +44,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
- lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: primaryRequest.Count,
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index e4aa6c6d3..95bbde9f9 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -117,7 +117,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index f6b2b69e9..e7ee2d2ba 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -12,7 +12,7 @@ import (
"strings"
)
-func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
ctx := context.Background()
@@ -21,9 +21,9 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption,
return err
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
- return fn(client)
+ return fn(ctx2, client)
}, grpcAddress, grpcDialOption)
}
@@ -38,7 +38,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
+func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
ctx := context.Background()
@@ -47,9 +47,9 @@ func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption,
return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
- return fn(client)
+ return fn(ctx2, client)
}, masterGrpcAddress, grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index d0773e7fd..78769ac5a 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
//only query unknown_vids
- err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
}
- resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
+ resp, grpcErr := masterClient.LookupVolume(ctx, req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
index b69a33750..3e6327f19 100644
--- a/weed/operation/stats.go
+++ b/weed/operation/stats.go
@@ -9,9 +9,9 @@ import (
func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
- err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
- grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req)
+ grpcResponse, grpcErr := masterClient.Statistics(ctx, req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index 5562f12ab..4b39ad544 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -8,9 +8,9 @@ import (
func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
- resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
+ resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{
VolumeId: vid,
})
return nil
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index b53f18ce1..1e8b0a16e 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -26,9 +26,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
}
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
- stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+ stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),