diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-02-25 21:50:12 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-02-25 21:50:12 -0800 |
| commit | 892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 (patch) | |
| tree | 3bf821356579902219633c6f6d42739deb1edd2d /weed/shell/command_ec_rebuild.go | |
| parent | bd3254b53f78b8f42e31ea50cbf2e0d7e87b2bbc (diff) | |
| download | seaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.tar.xz seaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.zip | |
avoid reusing context object
fix https://github.com/chrislusf/seaweedfs/issues/1182
Diffstat (limited to 'weed/shell/command_ec_rebuild.go')
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 29 |
1 files changed, 13 insertions, 16 deletions
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 600a8cb45..d9d943e6d 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -64,7 +64,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W } // collect all ec nodes - allEcNodes, _, err := collectEcNodes(context.Background(), commandEnv, "") + allEcNodes, _, err := collectEcNodes(commandEnv, "") if err != nil { return err } @@ -92,8 +92,6 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, writer io.Writer, applyChanges bool) error { - ctx := context.Background() - fmt.Printf("rebuildEcVolumes %s\n", collection) // collect vid => each shard locations, similar to ecShardMap in topology.go @@ -117,7 +115,7 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s return fmt.Errorf("disk space is not enough") } - if err := rebuildOneEcVolume(ctx, commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { + if err := rebuildOneEcVolume(commandEnv, allEcNodes[0], collection, vid, locations, writer, applyChanges); err != nil { return err } } @@ -125,13 +123,13 @@ func rebuildEcVolumes(commandEnv *CommandEnv, allEcNodes []*EcNode, collection s return nil } -func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { +func rebuildOneEcVolume(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyChanges bool) error { fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId) // collect shard files to rebuilder local disk var generatedShardIds []uint32 - copiedShardIds, _, err := prepareDataToRecover(ctx, commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) + copiedShardIds, _, err := prepareDataToRecover(commandEnv, rebuilder, collection, volumeId, locations, writer, applyChanges) if err != nil { return err } @@ -139,7 +137,7 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * // clean up working files // ask the rebuilder to delete the copied shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) + err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, copiedShardIds) if err != nil { fmt.Fprintf(writer, "%s delete copied ec shards %s %d.%v\n", rebuilder.info.Id, collection, volumeId, copiedShardIds) } @@ -151,13 +149,13 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * } // generate ec shards, and maybe ecx file - generatedShardIds, err = generateMissingShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) + generatedShardIds, err = generateMissingShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id) if err != nil { return err } // mount the generated shards - err = mountEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) + err = mountEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, rebuilder.info.Id, generatedShardIds) if err != nil { return err } @@ -167,11 +165,10 @@ func rebuildOneEcVolume(ctx context.Context, commandEnv *CommandEnv, rebuilder * return nil } -func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, - collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { +func generateMissingShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string) (rebuiltShardIds []uint32, err error) { - err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(ctx, &volume_server_pb.VolumeEcShardsRebuildRequest{ + err = operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, rebultErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{ VolumeId: uint32(volumeId), Collection: collection, }) @@ -183,7 +180,7 @@ func generateMissingShards(ctx context.Context, grpcDialOption grpc.DialOption, return } -func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { +func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations, writer io.Writer, applyBalancing bool) (copiedShardIds []uint32, localShardIds []uint32, err error) { needEcxFile := true var localShardBits erasure_coding.ShardBits @@ -209,8 +206,8 @@ func prepareDataToRecover(ctx context.Context, commandEnv *CommandEnv, rebuilder var copyErr error if applyBalancing { - copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { - _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ + copyErr = operation.WithVolumeServerClient(rebuilder.info.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: []uint32{uint32(shardId)}, |
