diff options
Diffstat (limited to 'weed/shell/command_ec_encode.go')
| -rw-r--r-- | weed/shell/command_ec_encode.go | 49 |
1 files changed, 23 insertions, 26 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 587b59388..e22691c00 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -63,22 +63,21 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } - ctx := context.Background() vid := needle.VolumeId(*volumeId) // volumeId is provided if vid != 0 { - return doEcEncode(ctx, commandEnv, *collection, vid) + return doEcEncode(commandEnv, *collection, vid) } // apply to all volumes in the collection - volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod) if err != nil { return err } fmt.Printf("ec encode volumes: %v\n", volumeIds) for _, vid := range volumeIds { - if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil { + if err = doEcEncode(commandEnv, *collection, vid); err != nil { return err } } @@ -86,7 +85,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr return nil } -func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { +func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) { // find volume location locations, found := commandEnv.MasterClient.GetLocations(uint32(vid)) if !found { @@ -96,19 +95,19 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, // fmt.Printf("found ec %d shards on %v\n", vid, locations) // mark the volume as readonly - err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) + err = markVolumeReadonly(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations) if err != nil { return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err) } // generate ec shards - err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) + err = generateEcShards(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url) if err != nil { return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err) } // balance the ec shards to current cluster - err = spreadEcShards(ctx, commandEnv, vid, collection, locations) + err = spreadEcShards(context.Background(), commandEnv, vid, collection, locations) if err != nil { return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err) } @@ -116,12 +115,12 @@ func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, return nil } -func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { +func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error { for _, location := range locations { - err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{ + err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ VolumeId: uint32(volumeId), }) return markErr @@ -136,10 +135,10 @@ func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, vol return nil } -func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { +func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error { - err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{ + err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -152,7 +151,7 @@ func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volum func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) { - allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "") + allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -169,26 +168,26 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle allocatedEcIds := balancedEcDistribution(allocatedDataNodes) // ask the data nodes to copy from the source volume server - copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) + copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0]) if err != nil { return err } // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) + err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return err } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err) } // ask the source volume server to delete the original volume for _, location := range existingLocations { - err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url) + err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url) if err != nil { return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err) } @@ -198,9 +197,7 @@ func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle } -func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, - targetServers []*EcNode, allocatedEcIds [][]uint32, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { +func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) { // parallelize shardIdChan := make(chan []uint32, len(targetServers)) @@ -213,7 +210,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia wg.Add(1) go func(server *EcNode, allocatedEcShardIds []uint32) { defer wg.Done() - copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server, + copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server, allocatedEcShardIds, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr @@ -255,11 +252,11 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) { return allocated } -func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { |
