diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 2 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 2 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 12 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 4 | ||||
| -rw-r--r-- | weed/operation/stats.go | 4 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 4 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 4 |
7 files changed, 16 insertions, 16 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2dfa44483..b67d8b708 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -44,7 +44,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: primaryRequest.Count, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index e4aa6c6d3..95bbde9f9 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -117,7 +117,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index f6b2b69e9..e7ee2d2ba 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -12,7 +12,7 @@ import ( "strings" ) -func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error { ctx := context.Background() @@ -21,9 +21,9 @@ func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, return err } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, grpcAddress, grpcDialOption) } @@ -38,7 +38,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { +func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error { ctx := context.Background() @@ -47,9 +47,9 @@ func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, masterGrpcAddress, grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index d0773e7fd..78769ac5a 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, } - resp, grpcErr := masterClient.LookupVolume(context.Background(), req) + resp, grpcErr := masterClient.LookupVolume(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/stats.go b/weed/operation/stats.go index b69a33750..3e6327f19 100644 --- a/weed/operation/stats.go +++ b/weed/operation/stats.go @@ -9,9 +9,9 @@ import ( func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { - grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req) + grpcResponse, grpcErr := masterClient.Statistics(ctx, req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index 5562f12ab..4b39ad544 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -8,9 +8,9 @@ import ( func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ + resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, }) return nil diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index b53f18ce1..1e8b0a16e 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -26,9 +26,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume } func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ + stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ VolumeId: uint32(vid), SinceNs: sinceNs, IdleTimeoutSeconds: uint32(idleTimeoutSeconds), |
