aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go15
1 files changed, 11 insertions, 4 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 499c8a32e..ac42b520d 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -8,6 +8,7 @@ import (
"sort"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -196,9 +197,10 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
targetServer *master_pb.DataNodeInfo, startFromShardId uint32, shardCount uint32,
volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) {
+ var shardIdsToCopy []uint32
for shardId := startFromShardId; shardId < startFromShardId+shardCount; shardId++ {
fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.Id)
- copiedShardIds = append(copiedShardIds, shardId)
+ shardIdsToCopy = append(shardIdsToCopy, shardId)
}
err = operation.WithVolumeServerClient(targetServer.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -208,7 +210,7 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
- ShardIds: copiedShardIds,
+ ShardIds: shardIdsToCopy,
SourceDataNode: existingLocation.Url,
})
if copyErr != nil {
@@ -219,12 +221,17 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
_, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
- ShardIds: copiedShardIds,
+ ShardIds: shardIdsToCopy,
})
if mountErr != nil {
return mountErr
}
+ if targetServer.Id != existingLocation.Url {
+ copiedShardIds = shardIdsToCopy
+ glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds)
+ }
+
return nil
})
@@ -243,7 +250,7 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
- ShardIds: toBeDeletedShardIds,
+ ShardIds: toBeDeletedShardIds,
ShouldDeleteEcx: shouldDeleteEcx,
})
return deleteErr