aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-13 01:55:49 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-13 01:55:49 -0700
commit6cd1ce8b749a5befc0af67feaced94000f6fb7bf (patch)
treee752edc4a0c98e061eb74bf1745f9e3162550db4
parentf74b29416a95adfa2ed1aafcee34125dcdb48737 (diff)
downloadseaweedfs-6cd1ce8b749a5befc0af67feaced94000f6fb7bf.tar.xz
seaweedfs-6cd1ce8b749a5befc0af67feaced94000f6fb7bf.zip
erasure coding: add cleanup step if anything goes wrong
-rw-r--r--weed/shell/command_ec_encode.go14
1 files changed, 14 insertions, 0 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 39ca39a4f..fd01ac700 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -203,6 +203,14 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer
server.addEcVolumeShards(volumeId, collection, copiedShardIds)
}
}
+ cleanupFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
+ if err := unmountEcShards(grpcDialOption, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
+ fmt.Printf("unmount aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
+ }
+ if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
+ fmt.Printf("remove aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
+ }
+ }
// maybe parallelize
for i, server := range targetServers {
@@ -221,6 +229,12 @@ func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServer
close(shardIdChan)
if err != nil {
+ for i, server := range targetServers {
+ if len(allocatedEcIds[i]) <= 0 {
+ continue
+ }
+ cleanupFunc(server, allocatedEcIds[i])
+ }
return nil, err
}