diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_common.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_ec_decode.go | 6 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_fs_cat.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_du.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_cat.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_load.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_mv.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_mount.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_move.go | 6 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_download.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_upload.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_unmount.go | 2 | ||||
| -rw-r--r-- | weed/shell/commands.go | 2 |
16 files changed, 28 insertions, 28 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 2beed4742..e187d5a3b 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -56,7 +56,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id) - err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { if targetServer.info.Id != existingLocation { @@ -216,7 +216,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -232,7 +232,7 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{ VolumeId: uint32(volumeId), ShardIds: toBeUnmountedhardIds, @@ -246,7 +246,7 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation) - return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 1f9ad2ff9..8a705a5ae 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -99,7 +99,7 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume - if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(vid), }) @@ -132,7 +132,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer) - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{ VolumeId: uint32(vid), Collection: collection, @@ -170,7 +170,7 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB continue } - err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 58527abf2..587b59388 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -120,7 +120,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) @@ -138,7 +138,7 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 2e2fca743..600a8cb45 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -170,7 +170,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, @@ -209,7 +209,7 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go index 9db36e9d1..238dee7f9 100644 --- a/weed/shell/command_fs_cat.go +++ b/weed/shell/command_fs_cat.go @@ -46,7 +46,7 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index 2c46350b2..d6ea51d0c 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -82,12 +82,12 @@ func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient file return } -func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error { +func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000) - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) + return fn(ctx2, client) }, filerGrpcAddress, env.option.GrpcDialOption) } @@ -105,6 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm filerPort: filerPort, } } -func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 5908b0a3c..9980f67a2 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -45,7 +45,7 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W dir, name := filer2.FullPath(path).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 52dd0321b..8f2ef95e3 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -55,7 +55,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. ctx := context.Background() - err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go index 67606ab53..e77755921 100644 --- a/weed/shell/command_fs_mv.go +++ b/weed/shell/command_fs_mv.go @@ -53,7 +53,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { // collect destination entry info destinationRequest := &filer_pb.LookupDirectoryEntryRequest{ diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6f35dd5d2..7a1a77cbe 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -113,7 +113,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: volumeInfo.Id, SourceDataNode: sourceNode.dataNode.Id, diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go index 50a307492..21bc342b4 100644 --- a/weed/shell/command_volume_mount.go +++ b/weed/shell/command_volume_mount.go @@ -51,7 +51,7 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io } func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index e74b43ed4..2e39c0600 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -88,7 +88,7 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { - err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: sourceVolumeServer, @@ -104,7 +104,7 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) { - return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{ VolumeId: uint32(volumeId), SinceNs: lastAppendAtNs, @@ -117,7 +117,7 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne } func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index 4584289d7..0f1a1bb6e 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -118,7 +118,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error { - err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index 0a9e6165f..20da1187c 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -114,7 +114,7 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{ VolumeId: uint32(volumeId), Collection: collection, diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go index 8096f34d8..826258dfb 100644 --- a/weed/shell/command_volume_unmount.go +++ b/weed/shell/command_volume_unmount.go @@ -51,7 +51,7 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer } func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) { - return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{ VolumeId: uint32(volumeId), }) diff --git a/weed/shell/commands.go b/weed/shell/commands.go index a6a0f7dec..f1fcb62d4 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -70,7 +70,7 @@ func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, fi dir, name := filer2.FullPath(path).DirAndName() - return ce.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ Directory: dir, |
