aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-25 21:50:12 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-25 21:50:12 -0800
commit892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 (patch)
tree3bf821356579902219633c6f6d42739deb1edd2d /weed/shell
parentbd3254b53f78b8f42e31ea50cbf2e0d7e87b2bbc (diff)
downloadseaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.tar.xz
seaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.zip
avoid reusing context object
fix https://github.com/chrislusf/seaweedfs/issues/1182
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_bucket_create.go8
-rw-r--r--weed/shell/command_bucket_delete.go8
-rw-r--r--weed/shell/command_bucket_list.go8
-rw-r--r--weed/shell/command_collection_delete.go5
-rw-r--r--weed/shell/command_collection_list.go5
-rw-r--r--weed/shell/command_ec_balance.go46
-rw-r--r--weed/shell/command_ec_common.go43
-rw-r--r--weed/shell/command_ec_decode.go43
-rw-r--r--weed/shell/command_ec_encode.go49
-rw-r--r--weed/shell/command_ec_rebuild.go29
-rw-r--r--weed/shell/command_ec_test.go2
-rw-r--r--weed/shell/command_fs_cat.go8
-rw-r--r--weed/shell/command_fs_cd.go5
-rw-r--r--weed/shell/command_fs_du.go23
-rw-r--r--weed/shell/command_fs_ls.go7
-rw-r--r--weed/shell/command_fs_meta_cat.go6
-rw-r--r--weed/shell/command_fs_meta_load.go10
-rw-r--r--weed/shell/command_fs_meta_save.go2
-rw-r--r--weed/shell/command_fs_mv.go8
-rw-r--r--weed/shell/command_fs_tree.go2
-rw-r--r--weed/shell/command_volume_balance.go8
-rw-r--r--weed/shell/command_volume_configure_replication.go9
-rw-r--r--weed/shell/command_volume_copy.go4
-rw-r--r--weed/shell/command_volume_delete.go4
-rw-r--r--weed/shell/command_volume_fix_replication.go9
-rw-r--r--weed/shell/command_volume_list.go5
-rw-r--r--weed/shell/command_volume_mount.go9
-rw-r--r--weed/shell/command_volume_move.go29
-rw-r--r--weed/shell/command_volume_tier_download.go21
-rw-r--r--weed/shell/command_volume_tier_upload.go19
-rw-r--r--weed/shell/command_volume_unmount.go9
-rw-r--r--weed/shell/commands.go12
32 files changed, 202 insertions, 253 deletions
diff --git a/weed/shell/command_bucket_create.go b/weed/shell/command_bucket_create.go
index 603e9c564..3546528aa 100644
--- a/weed/shell/command_bucket_create.go
+++ b/weed/shell/command_bucket_create.go
@@ -48,11 +48,9 @@ func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer i
return parseErr
}
- ctx := context.Background()
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
}
@@ -72,7 +70,7 @@ func (c *commandBucketCreate) Do(args []string, commandEnv *CommandEnv, writer i
},
}
- if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: filerBucketsPath,
Entry: entry,
}); err != nil {
diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go
index 9e814ccf9..c57ce7221 100644
--- a/weed/shell/command_bucket_delete.go
+++ b/weed/shell/command_bucket_delete.go
@@ -44,17 +44,15 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i
return parseErr
}
- ctx := context.Background()
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
}
filerBucketsPath := resp.DirBuckets
- if _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
+ if _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
Directory: filerBucketsPath,
Name: *bucketName,
IsDeleteData: false,
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go
index 32198c29d..5eb5972ce 100644
--- a/weed/shell/command_bucket_list.go
+++ b/weed/shell/command_bucket_list.go
@@ -39,17 +39,15 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.
return parseErr
}
- ctx := context.Background()
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s:%d configuration: %v", filerServer, filerPort, err)
}
filerBucketsPath := resp.DirBuckets
- stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
+ stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: filerBucketsPath,
Limit: math.MaxUint32,
})
diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go
index fbaddcd51..4b3d7f0be 100644
--- a/weed/shell/command_collection_delete.go
+++ b/weed/shell/command_collection_delete.go
@@ -34,9 +34,8 @@ func (c *commandCollectionDelete) Do(args []string, commandEnv *CommandEnv, writ
collectionName := args[0]
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
return err
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go
index c4325c66f..2a114e61b 100644
--- a/weed/shell/command_collection_list.go
+++ b/weed/shell/command_collection_list.go
@@ -41,9 +41,8 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer
func ListCollectionNames(commandEnv *CommandEnv, includeNormalVolumes, includeEcVolumes bool) (collections []string, err error) {
var resp *master_pb.CollectionListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.CollectionList(ctx, &master_pb.CollectionListRequest{
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: includeNormalVolumes,
IncludeEcVolumes: includeEcVolumes,
})
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 96599372e..7230a869f 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -107,10 +107,8 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
return nil
}
- ctx := context.Background()
-
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, *dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
if err != nil {
return err
}
@@ -138,7 +136,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
}
}
- if err := balanceEcRacks(ctx, commandEnv, racks, *applyBalancing); err != nil {
+ if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
return fmt.Errorf("balance ec racks: %v", err)
}
@@ -170,11 +168,11 @@ func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*E
return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsAcrossRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
- if err := balanceEcShardsWithinRacks(ctx, commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
+ if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
@@ -186,14 +184,14 @@ func deleteDuplicatedEcShards(ctx context.Context, commandEnv *CommandEnv, allEc
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// deduplicate ec shards
for vid, locations := range vidLocations {
- if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil {
+ if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
+func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
// check whether this volume has ecNodes that are over average
shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
@@ -215,10 +213,10 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
- if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
- if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
@@ -227,19 +225,19 @@ func doDeduplicateEcShards(ctx context.Context, commandEnv *CommandEnv, collecti
return nil
}
-func balanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
// spread the ec shards evenly
for vid, locations := range vidLocations {
- if err := doBalanceEcShardsAcrossRacks(ctx, commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
+ if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
+func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
@@ -274,7 +272,7 @@ func doBalanceEcShardsAcrossRacks(ctx context.Context, commandEnv *CommandEnv, c
for _, n := range racks[rackId].ecNodes {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -306,7 +304,7 @@ func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]i
return ""
}
-func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
+func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := collectVolumeIdToEcNodes(allEcNodes)
@@ -330,7 +328,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
- if err := doBalanceEcShardsWithinOneRack(ctx, commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
+ if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
return err
}
}
@@ -338,7 +336,7 @@ func balanceEcShardsWithinRacks(ctx context.Context, commandEnv *CommandEnv, all
return nil
}
-func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
for _, ecNode := range existingLocations {
@@ -353,7 +351,7 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -365,18 +363,18 @@ func doBalanceEcShardsWithinOneRack(ctx context.Context, commandEnv *CommandEnv,
return nil
}
-func balanceEcRacks(ctx context.Context, commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
+func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
// balance one rack for all ec shards
for _, ecRack := range racks {
- if err := doBalanceEcRack(ctx, commandEnv, ecRack, applyBalancing); err != nil {
+ if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
return err
}
}
return nil
}
-func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
+func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
if len(ecRack.ecNodes) <= 1 {
return nil
@@ -421,7 +419,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
if err != nil {
return err
}
@@ -440,7 +438,7 @@ func doBalanceEcRack(ctx context.Context, commandEnv *CommandEnv, ecRack *EcRack
return nil
}
-func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodesByFreeslotsDecending(possibleDestinationEcNodes)
@@ -458,7 +456,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *CommandEnv, a
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
- err := moveMountedShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
+ err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
if err != nil {
return err
}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index e187d5a3b..0db119d3c 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -15,26 +15,26 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
copiedShardIds := []uint32{uint32(shardId)}
if applyBalancing {
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
@@ -50,18 +50,18 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist
}
-func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetServer.info.Id != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -76,7 +76,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
}
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -178,12 +178,12 @@ type EcRack struct {
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -211,13 +211,12 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen
return
}
-func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
@@ -227,13 +226,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
}
-func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
})
@@ -241,13 +239,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
})
}
-func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
- return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 8a705a5ae..b69e403cb 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -43,25 +43,24 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, err := collectTopologyInfo(ctx, commandEnv)
+ topologyInfo, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
// volumeId is provided
if vid != 0 {
- return doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid)
+ return doEcDecode(commandEnv, topologyInfo, *collection, vid)
}
// apply to all volumes in the collection
volumeIds := collectEcShardIds(topologyInfo, *collection)
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcDecode(ctx, commandEnv, topologyInfo, *collection, vid); err != nil {
+ if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
return err
}
}
@@ -69,26 +68,26 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
+func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
// find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
// collect ec shards to the server with most space
- targetNodeLocation, err := collectEcShards(ctx, commandEnv, nodeToEcIndexBits, collection, vid)
+ targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
if err != nil {
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
}
// generate a normal volume
- err = generateNormalVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
+ err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
if err != nil {
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}
// delete the previous ec shards
- err = mountVolumeAndDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
+ err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
if err != nil {
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
}
@@ -96,11 +95,11 @@ func doEcDecode(ctx context.Context, commandEnv *CommandEnv, topoInfo *master_pb
return nil
}
-func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
+func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
- if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+ if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(vid),
})
return mountErr
@@ -111,7 +110,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
// unmount ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := unmountEcShards(ctx, grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
+ err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
}
@@ -119,7 +118,7 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
// delete ec shards
for location, ecIndexBits := range nodeToEcIndexBits {
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := sourceServerDeleteEcShards(ctx, grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
+ err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
if err != nil {
return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
}
@@ -128,12 +127,12 @@ func mountVolumeAndDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialO
return nil
}
-func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsToVolume(ctx, &volume_server_pb.VolumeEcShardsToVolumeRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
VolumeId: uint32(vid),
Collection: collection,
})
@@ -144,7 +143,7 @@ func generateNormalVolume(ctx context.Context, grpcDialOption grpc.DialOption, v
}
-func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
+func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
maxShardCount := 0
var exisitngEcIndexBits erasure_coding.ShardBits
@@ -170,11 +169,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
continue
}
- err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(vid),
Collection: collection,
ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
@@ -204,11 +203,11 @@ func collectEcShards(ctx context.Context, commandEnv *CommandEnv, nodeToEcIndexB
}
-func collectTopologyInfo(ctx context.Context, commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
+func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 587b59388..e22691c00 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -63,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doEcEncode(ctx, commandEnv, *collection, vid)
+ return doEcEncode(commandEnv, *collection, vid)
}
// apply to all volumes in the collection
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("ec encode volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
return err
}
}
@@ -86,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -96,19 +95,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
- err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
+ err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
if err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
// balance the ec shards to current cluster
- err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
+ err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations)
if err != nil {
return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
}
@@ -116,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string,
return nil
}
-func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
+func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
for _, location := range locations {
- err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
+ err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
VolumeId: uint32(volumeId),
})
return markErr
@@ -136,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol
return nil
}
-func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
+func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -152,7 +151,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum
func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "")
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -169,26 +168,26 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
// ask the data nodes to copy from the source volume server
- copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
+ copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return err
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
}
// ask the source volume server to delete the original volume
for _, location := range existingLocations {
- err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
+ err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
if err != nil {
return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
}
@@ -198,9 +197,7 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle
}
-func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
- targetServers []*EcNode, allocatedEcIds [][]uint32,
- volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
+func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
// parallelize
shardIdChan := make(chan []uint32, len(targetServers))
@@ -213,7 +210,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
wg.Add(1)
go func(server *EcNode, allocatedEcShardIds []uint32) {
defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
+ copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
allocatedEcShardIds, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
@@ -255,11 +252,11 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
return allocated
}
-func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 600a8cb45..d9d943e6d 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -64,7 +64,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
}
// collect all ec nodes
- allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv, "")
+ allEcNodes, _, err := collectEcNodes(commandEnv, "")
if err != nil {
return err
}
@@ -92,8 +92,6 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error {
- ctx := context.Background()
-
fmt.Printf("rebuildEcVolumes %s\n", collection)
// collect vid => each shard locations, similar to ecShardMap in topology.go
@@ -117,7 +115,7 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return fmt.Errorf("disk space is not enough")
}
- if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
+ if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil {
return err
}
}
@@ -125,13 +123,13 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s
return nil
}
-func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
+func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error {
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
var generatedShardIds []uint32
- copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
+ copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges)
if err != nil {
return err
}
@@ -139,7 +137,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
// clean up working files
// ask the rebuilder to delete the copied shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds)
if err != nil {
fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds)
}
@@ -151,13 +149,13 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
}
// generate ec shards, and maybe ecx file
- generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
+ generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id)
if err != nil {
return err
}
// mount the generated shards
- err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
+ err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds)
if err != nil {
return err
}
@@ -167,11 +165,10 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *
return nil
}
-func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
+func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) {
- err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{
+ err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -183,7 +180,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption,
return
}
-func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
+func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) {
needEcxFile := true
var localShardBits erasure_coding.ShardBits
@@ -209,8 +206,8 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder
var copyErr error
if applyBalancing {
- copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: []uint32{uint32(shardId)},
diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go
index c233d25d0..ddd52303c 100644
--- a/weed/shell/command_ec_test.go
+++ b/weed/shell/command_ec_test.go
@@ -121,7 +121,7 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
racks := collectRacks(allEcNodes)
balanceEcVolumes(nil, "c1", allEcNodes, racks, false)
- balanceEcRacks(context.Background(), nil, racks, false)
+ balanceEcRacks(nil, racks, false)
}
func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode {
diff --git a/weed/shell/command_fs_cat.go b/weed/shell/command_fs_cat.go
index 06c8232c9..8364e0de1 100644
--- a/weed/shell/command_fs_cat.go
+++ b/weed/shell/command_fs_cat.go
@@ -38,21 +38,19 @@ func (c *commandFsCat) Do(args []string, commandEnv *CommandEnv, writer io.Write
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
return fmt.Errorf("%s is a directory", path)
}
dir, name := filer2.FullPath(path).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- respLookupEntry, err := client.LookupDirectoryEntry(ctx, request)
+ respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request)
if err != nil {
return err
}
diff --git a/weed/shell/command_fs_cd.go b/weed/shell/command_fs_cd.go
index 408ec86c8..df42cd516 100644
--- a/weed/shell/command_fs_cd.go
+++ b/weed/shell/command_fs_cd.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"io"
)
@@ -45,9 +44,7 @@ func (c *commandFsCd) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return nil
}
- ctx := context.Background()
-
- err = commandEnv.checkDirectory(ctx, filerServer, filerPort, path)
+ err = commandEnv.checkDirectory(filerServer, filerPort, path)
if err == nil {
commandEnv.option.FilerHost = filerServer
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index d6ea51d0c..a1e21bfa6 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -39,15 +38,13 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
path = path + "/"
}
var blockCount, byteCount uint64
dir, name := filer2.FullPath(path).DirAndName()
- blockCount, byteCount, err = duTraverseDirectory(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name)
+ blockCount, byteCount, err = duTraverseDirectory(writer, commandEnv.getFilerClient(filerServer, filerPort), dir, name)
if name == "" && err == nil {
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
@@ -57,15 +54,15 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
}
-func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount uint64, byteCount uint64, err error) {
+func duTraverseDirectory(writer io.Writer, filerClient filer2.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
- err = filer2.ReadDirAllEntries(ctx, filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer2.ReadDirAllEntries(filerClient, filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
if entry.IsDirectory {
subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
if dir == "/" {
subDir = "/" + entry.Name
}
- numBlock, numByte, err := duTraverseDirectory(ctx, writer, filerClient, subDir, "")
+ numBlock, numByte, err := duTraverseDirectory(writer, filerClient, subDir, "")
if err == nil {
blockCount += numBlock
byteCount += numByte
@@ -82,12 +79,12 @@ func duTraverseDirectory(ctx context.Context, writer io.Writer, filerClient file
return
}
-func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
+func (env *CommandEnv) withFilerClient(filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
- return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
+ return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(ctx2, client)
+ return fn(client)
}, filerGrpcAddress, env.option.GrpcDialOption)
}
@@ -105,6 +102,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm
filerPort: filerPort,
}
}
-func (c *commandFilerClient) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
- return c.env.withFilerClient(ctx, c.filerServer, c.filerPort, fn)
+func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+ return c.env.withFilerClient(c.filerServer, c.filerPort, fn)
}
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 0c63f71fa..69ebe1b30 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
"os"
@@ -60,16 +59,14 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
- if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
+ if commandEnv.isDirectory(filerServer, filerPort, path) {
path = path + "/"
}
dir, name := filer2.FullPath(path).DirAndName()
entryCount := 0
- err = filer2.ReadDirAllEntries(ctx, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer2.ReadDirAllEntries(commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
if !showHidden && strings.HasPrefix(entry.Name, ".") {
return
diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go
index ec9a495f2..ec5a093df 100644
--- a/weed/shell/command_fs_meta_cat.go
+++ b/weed/shell/command_fs_meta_cat.go
@@ -41,17 +41,15 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W
return err
}
- ctx := context.Background()
-
dir, name := filer2.FullPath(path).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- respLookupEntry, err := client.LookupDirectoryEntry(ctx, request)
+ respLookupEntry, err := client.LookupDirectoryEntry(context.Background(), request)
if err != nil {
return err
}
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 8f2ef95e3..ed92d8011 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -1,15 +1,15 @@
package shell
import (
- "context"
"fmt"
"io"
"os"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -53,9 +53,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- ctx := context.Background()
-
- err = commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ err = commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
@@ -80,7 +78,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
return err
}
- if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: fullEntry.Dir,
Entry: fullEntry.Entry,
}); err != nil {
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index 178c826d5..7112c7526 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -168,7 +168,7 @@ func processOneDirectory(ctx context.Context, writer io.Writer, filerClient file
parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup,
fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) {
- return filer2.ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
+ return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) {
fn(parentPath, entry)
diff --git a/weed/shell/command_fs_mv.go b/weed/shell/command_fs_mv.go
index b9301ad3c..78f797f6c 100644
--- a/weed/shell/command_fs_mv.go
+++ b/weed/shell/command_fs_mv.go
@@ -47,20 +47,18 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
return err
}
- ctx := context.Background()
-
sourceDir, sourceName := filer2.FullPath(sourcePath).DirAndName()
destinationDir, destinationName := filer2.FullPath(destinationPath).DirAndName()
- return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return commandEnv.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
// collect destination entry info
destinationRequest := &filer_pb.LookupDirectoryEntryRequest{
Name: destinationDir,
Directory: destinationName,
}
- respDestinationLookupEntry, err := client.LookupDirectoryEntry(ctx, destinationRequest)
+ respDestinationLookupEntry, err := client.LookupDirectoryEntry(context.Background(), destinationRequest)
var targetDir, targetName string
@@ -82,7 +80,7 @@ func (c *commandFsMv) Do(args []string, commandEnv *CommandEnv, writer io.Writer
NewName: targetName,
}
- _, err = client.AtomicRenameEntry(ctx, request)
+ _, err = client.AtomicRenameEntry(context.Background(), request)
fmt.Fprintf(writer, "move: %s => %s\n", sourcePath, filer2.NewFullPath(targetDir, targetName))
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index 8660030e3..fb2583240 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -53,7 +53,7 @@ func treeTraverseDirectory(ctx context.Context, writer io.Writer, filerClient fi
prefix.addMarker(level)
- err = filer2.ReadDirAllEntries(ctx, filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer2.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
if level < 0 && name != "" {
if entry.Name != name {
return
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 488beb998..349f52f1c 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -69,9 +69,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -239,8 +238,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
}
fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyBalancing {
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
}
return nil
}
diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go
index 6000d0de0..133ec62c6 100644
--- a/weed/shell/command_volume_configure_replication.go
+++ b/weed/shell/command_volume_configure_replication.go
@@ -53,9 +53,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
replicaPlacementInt32 := uint32(replicaPlacement.Byte())
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -81,8 +80,8 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman
}
for _, dst := range allLocations {
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, configureErr := volumeServerClient.VolumeConfigure(ctx, &volume_server_pb.VolumeConfigureRequest{
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, configureErr := volumeServerClient.VolumeConfigure(context.Background(), &volume_server_pb.VolumeConfigureRequest{
VolumeId: uint32(vid),
Replication: replicaPlacement.String(),
})
diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go
index 1c83ba655..aecc071ad 100644
--- a/weed/shell/command_volume_copy.go
+++ b/weed/shell/command_volume_copy.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -47,7 +46,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- _, err = copyVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
return
}
diff --git a/weed/shell/command_volume_delete.go b/weed/shell/command_volume_delete.go
index 17d27ea3a..5869b1621 100644
--- a/weed/shell/command_volume_delete.go
+++ b/weed/shell/command_volume_delete.go
@@ -1,7 +1,6 @@
package shell
import (
- "context"
"fmt"
"io"
@@ -42,7 +41,6 @@ func (c *commandVolumeDelete) Do(args []string, commandEnv *CommandEnv, writer i
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return deleteVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 7a1a77cbe..210f4819d 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -50,9 +50,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -113,8 +112,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
break
}
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeInfo.Id,
SourceDataNode: sourceNode.dataNode.Id,
})
diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go
index c6c79d150..c5a9388fa 100644
--- a/weed/shell/command_volume_list.go
+++ b/weed/shell/command_volume_list.go
@@ -32,9 +32,8 @@ func (c *commandVolumeList) Help() string {
func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
diff --git a/weed/shell/command_volume_mount.go b/weed/shell/command_volume_mount.go
index 21bc342b4..cffc7136b 100644
--- a/weed/shell/command_volume_mount.go
+++ b/weed/shell/command_volume_mount.go
@@ -45,14 +45,13 @@ func (c *commandVolumeMount) Do(args []string, commandEnv *CommandEnv, writer io
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return mountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return mountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func mountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+func mountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
return mountErr
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 2e39c0600..c25b953a5 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -59,26 +59,25 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("source and target volume servers are the same!")
}
- ctx := context.Background()
- return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second)
}
// LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
-func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
+func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) {
log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- lastAppendAtNs, err := copyVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
+ lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer)
if err != nil {
return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
- if err = tailVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
+ if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
}
log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
- if err = deleteVolume(ctx, grpcDialOption, volumeId, sourceVolumeServer); err != nil {
+ if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer); err != nil {
return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
}
@@ -86,10 +85,10 @@ func LiveMoveVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeI
return nil
}
-func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
+func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) {
- err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- resp, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ err = operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: uint32(volumeId),
SourceDataNode: sourceVolumeServer,
})
@@ -102,10 +101,10 @@ func copyVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
return
}
-func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
+func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
- return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeTailReceiver(ctx, &volume_server_pb.VolumeTailReceiverRequest{
+ return operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
VolumeId: uint32(volumeId),
SinceNs: lastAppendAtNs,
IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
@@ -116,9 +115,9 @@ func tailVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId ne
}
-func deleteVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
+func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
VolumeId: uint32(volumeId),
})
return deleteErr
diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go
index 0f1a1bb6e..756dc4686 100644
--- a/weed/shell/command_volume_tier_download.go
+++ b/weed/shell/command_volume_tier_download.go
@@ -49,18 +49,17 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// collect topology information
- topologyInfo, err := collectTopologyInfo(ctx, commandEnv)
+ topologyInfo, err := collectTopologyInfo(commandEnv)
if err != nil {
return err
}
// volumeId is provided
if vid != 0 {
- return doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid)
+ return doVolumeTierDownload(commandEnv, writer, *collection, vid)
}
// apply to all volumes in the collection
@@ -71,7 +70,7 @@ func (c *commandVolumeTierDownload) Do(args []string, commandEnv *CommandEnv, wr
}
fmt.Printf("tier download volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doVolumeTierDownload(ctx, commandEnv, writer, *collection, vid); err != nil {
+ if err = doVolumeTierDownload(commandEnv, writer, *collection, vid); err != nil {
return err
}
}
@@ -97,7 +96,7 @@ func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection s
return
}
-func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
+func doVolumeTierDownload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
@@ -107,7 +106,7 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io
// TODO parallelize this
for _, loc := range locations {
// copy the .dat file from remote tier to local
- err = downloadDatFromRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
+ err = downloadDatFromRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, loc.Url)
if err != nil {
return fmt.Errorf("download dat file for volume %d to %s: %v", vid, loc.Url, err)
}
@@ -116,10 +115,10 @@ func doVolumeTierDownload(ctx context.Context, commandEnv *CommandEnv, writer io
return nil
}
-func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
+func downloadDatFromRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, targetVolumeServer string) error {
- err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(ctx, &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
+ err := operation.WithVolumeServerClient(targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, downloadErr := volumeServerClient.VolumeTierMoveDatFromRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatFromRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
})
@@ -145,14 +144,14 @@ func downloadDatFromRemoteTier(ctx context.Context, grpcDialOption grpc.DialOpti
return downloadErr
}
- _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
if unmountErr != nil {
return unmountErr
}
- _, mountErr := volumeServerClient.VolumeMount(ctx, &volume_server_pb.VolumeMountRequest{
+ _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
VolumeId: uint32(volumeId),
})
if mountErr != nil {
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index 20da1187c..5131e8f85 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -67,23 +67,22 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
- ctx := context.Background()
vid := needle.VolumeId(*volumeId)
// volumeId is provided
if vid != 0 {
- return doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
+ return doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile)
}
// apply to all volumes in the collection
// reusing collectVolumeIdsForEcEncode for now
- volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
fmt.Printf("tier upload volumes: %v\n", volumeIds)
for _, vid := range volumeIds {
- if err = doVolumeTierUpload(ctx, commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
+ if err = doVolumeTierUpload(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile); err != nil {
return err
}
}
@@ -91,20 +90,20 @@ func (c *commandVolumeTierUpload) Do(args []string, commandEnv *CommandEnv, writ
return nil
}
-func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
+func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, dest string, keepLocalDatFile bool) (err error) {
// find volume location
locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
if !found {
return fmt.Errorf("volume %d not found", vid)
}
- err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
+ err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
if err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// copy the .dat file to remote tier
- err = uploadDatToRemoteTier(ctx, commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
+ err = uploadDatToRemoteTier(commandEnv.option.GrpcDialOption, writer, needle.VolumeId(vid), collection, locations[0].Url, dest, keepLocalDatFile)
if err != nil {
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, locations[0].Url, dest, err)
}
@@ -112,10 +111,10 @@ func doVolumeTierUpload(ctx context.Context, commandEnv *CommandEnv, writer io.W
return nil
}
-func uploadDatToRemoteTier(ctx context.Context, grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
+func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, collection string, sourceVolumeServer string, dest string, keepLocalDatFile bool) error {
- err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(ctx, &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
+ err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, copyErr := volumeServerClient.VolumeTierMoveDatToRemote(context.Background(), &volume_server_pb.VolumeTierMoveDatToRemoteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
DestinationBackendName: dest,
diff --git a/weed/shell/command_volume_unmount.go b/weed/shell/command_volume_unmount.go
index 826258dfb..6e5bef485 100644
--- a/weed/shell/command_volume_unmount.go
+++ b/weed/shell/command_volume_unmount.go
@@ -45,14 +45,13 @@ func (c *commandVolumeUnmount) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("wrong volume id format %s: %v", volumeId, err)
}
- ctx := context.Background()
- return unmountVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
+ return unmountVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer)
}
-func unmountVolume(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
- return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, unmountErr := volumeServerClient.VolumeUnmount(ctx, &volume_server_pb.VolumeUnmountRequest{
+func unmountVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, unmountErr := volumeServerClient.VolumeUnmount(context.Background(), &volume_server_pb.VolumeUnmountRequest{
VolumeId: uint32(volumeId),
})
return unmountErr
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index f1fcb62d4..31ca31bc3 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -44,7 +44,7 @@ var (
func NewCommandEnv(options ShellOptions) *CommandEnv {
return &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(context.Background(),
+ MasterClient: wdclient.NewMasterClient(
options.GrpcDialOption, "shell", strings.Split(*options.Masters, ",")),
option: options,
}
@@ -60,19 +60,19 @@ func (ce *CommandEnv) parseUrl(input string) (filerServer string, filerPort int6
return ce.option.FilerHost, ce.option.FilerPort, input, err
}
-func (ce *CommandEnv) isDirectory(ctx context.Context, filerServer string, filerPort int64, path string) bool {
+func (ce *CommandEnv) isDirectory(filerServer string, filerPort int64, path string) bool {
- return ce.checkDirectory(ctx, filerServer, filerPort, path) == nil
+ return ce.checkDirectory(filerServer, filerPort, path) == nil
}
-func (ce *CommandEnv) checkDirectory(ctx context.Context, filerServer string, filerPort int64, path string) error {
+func (ce *CommandEnv) checkDirectory(filerServer string, filerPort int64, path string) error {
dir, name := filer2.FullPath(path).DirAndName()
- return ce.withFilerClient(ctx, filerServer, filerPort, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return ce.withFilerClient(filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
- resp, lookupErr := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
+ resp, lookupErr := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
Directory: dir,
Name: name,
})