diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-01 01:41:22 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-01 01:41:22 -0700 |
| commit | ba18314aab2531ff6d7223cb2cf976dc01aaa735 (patch) | |
| tree | 335f87f5bcd0b48168de0150cb5e7e9cec6554e6 /weed/shell/command_ec_encode.go | |
| parent | f919d0235cff244941e4e9f8c51bf89224d3d0ab (diff) | |
| download | seaweedfs-ba18314aab2531ff6d7223cb2cf976dc01aaa735.tar.xz seaweedfs-ba18314aab2531ff6d7223cb2cf976dc01aaa735.zip | |
ec shard delete also check ec volumes, in addition to volumes
Diffstat (limited to 'weed/shell/command_ec_encode.go')
| -rw-r--r-- | weed/shell/command_ec_encode.go | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index 7ecd7bb8c..14d1ae96b 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -69,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr } // apply to all volumes in the collection - volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod) + volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod) if err != nil { return err } @@ -143,7 +143,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl } // ask the source volume server to clean up copied ec shards - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds) + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds) if err != nil { return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err) } @@ -177,7 +177,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia go func(server *EcNode, startFromShardId uint32, shardCount int) { defer wg.Done() copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server, - startFromShardId, shardCount, volumeId, collection, existingLocation) + startFromShardId, shardCount, volumeId, collection, existingLocation.Url) if copyErr != nil { err = copyErr } else { @@ -203,23 +203,23 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption, targetServer *EcNode, startFromShardId uint32, shardCount int, - volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) { + volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) { var shardIdsToCopy []uint32 for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ { - fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id) + fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id) shardIdsToCopy = append(shardIdsToCopy, shardId) } err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{ VolumeId: uint32(volumeId), Collection: collection, ShardIds: shardIdsToCopy, - SourceDataNode: existingLocation.Url, + SourceDataNode: existingLocation, }) if copyErr != nil { return copyErr @@ -235,9 +235,9 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di return mountErr } - if targetServer.info.Id != existingLocation.Url { + if targetServer.info.Id != existingLocation { copiedShardIds = shardIdsToCopy - glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds) + glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds) } return nil @@ -251,11 +251,11 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di } func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption, - volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error { + volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error { shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount - return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{ VolumeId: uint32(volumeId), ShardIds: toBeDeletedShardIds, @@ -349,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN return } -func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { +func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { var resp *master_pb.VolumeListResponse err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { |
