aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-01 01:41:22 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-01 01:41:22 -0700
commitba18314aab2531ff6d7223cb2cf976dc01aaa735 (patch)
tree335f87f5bcd0b48168de0150cb5e7e9cec6554e6
parentf919d0235cff244941e4e9f8c51bf89224d3d0ab (diff)
downloadseaweedfs-ba18314aab2531ff6d7223cb2cf976dc01aaa735.tar.xz
seaweedfs-ba18314aab2531ff6d7223cb2cf976dc01aaa735.zip
ec shard delete also check ec volumes, in addition to volumes
-rw-r--r--weed/server/volume_grpc_erasure_coding.go45
-rw-r--r--weed/shell/command_ec_balance.go27
-rw-r--r--weed/shell/command_ec_encode.go24
-rw-r--r--weed/storage/disk_location_ec.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume.go17
5 files changed, 86 insertions, 29 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 2ef89a040..c0d666b6d 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -90,25 +90,62 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
+ foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ if !foundExistingVolume {
+ err = vs.doDeleteMountedShards(ctx, req)
+ }
+
+ return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err
+}
+
+// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
+func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) {
+
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
- return nil, fmt.Errorf("volume %d not found", req.VolumeId)
+ return false, nil
}
+
baseFileName := v.FileName()
for _, shardId := range req.ShardIds {
if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil {
- return nil, err
+ return true, err
}
}
if req.ShouldDeleteEcx {
if err := os.Remove(baseFileName + ".ecx"); err != nil {
- return nil, err
+ return true, err
}
}
- return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
+ return true, nil
+}
+
+// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
+func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) {
+
+ ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ for _, shardId := range req.ShardIds {
+ if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found {
+ shard.Destroy()
+ }
+ }
+
+ if len(ecv.Shards) == 0 {
+ ecv.Destroy()
+ }
+
+ return nil
}
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 779d9ecd7..a3d097036 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -159,7 +159,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
if isOverLimit {
- if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil {
+ if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil {
return err
}
@@ -170,7 +170,7 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
return nil
}
-func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
for _, ecNode := range existingLocations {
@@ -185,7 +185,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv,
fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
- err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, vid, shardId, possibleDestinationEcNodes, applyBalancing)
+ err := pickOneEcNodeAndMoveOneShard(ctx, commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
if err != nil {
return err
}
@@ -197,7 +197,7 @@ func spreadShardsIntoMoreDataNodes(ctx context.Context, commandEnv *commandEnv,
return nil
}
-func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
+func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodes(possibleDestinationEcNodes)
@@ -215,7 +215,7 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
- err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, vid, shardId, destEcNode, applyBalancing)
+ err := moveOneShardToEcNode(ctx, commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
if err != nil {
return err
}
@@ -227,10 +227,23 @@ func pickOneEcNodeAndMoveOneShard(ctx context.Context, commandEnv *commandEnv, a
return nil
}
-func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
+func moveOneShardToEcNode(ctx context.Context, commandEnv *commandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
- return nil
+
+ if !applyBalancing {
+ return nil
+ }
+
+ // ask destination node to copy shard and the ecx file from source node, and mount it
+ copiedShardIds, err := oneServerCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
+ if err != nil {
+ return err
+ }
+
+ // ask source node to delete the shard, and maybe the ecx file
+ return sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 7ecd7bb8c..14d1ae96b 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -69,7 +69,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *commandEnv, writer io.Wr
}
// apply to all volumes in the collection
- volumeIds, err := collectVolumeForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *quietPeriod)
if err != nil {
return err
}
@@ -143,7 +143,7 @@ func balanceEcShards(ctx context.Context, commandEnv *commandEnv, volumeId needl
}
// ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0], copiedShardIds)
+ err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
if err != nil {
return fmt.Errorf("sourceServerDeleteEcShards %s %d.%v: %v", existingLocations[0], volumeId, copiedShardIds, err)
}
@@ -177,7 +177,7 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
go func(server *EcNode, startFromShardId uint32, shardCount int) {
defer wg.Done()
copiedShardIds, copyErr := oneServerCopyEcShardsFromSource(ctx, grpcDialOption, server,
- startFromShardId, shardCount, volumeId, collection, existingLocation)
+ startFromShardId, shardCount, volumeId, collection, existingLocation.Url)
if copyErr != nil {
err = copyErr
} else {
@@ -203,23 +203,23 @@ func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Dia
func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
targetServer *EcNode, startFromShardId uint32, shardCount int,
- volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
var shardIdsToCopy []uint32
for shardId := startFromShardId; shardId < startFromShardId+uint32(shardCount); shardId++ {
- fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation.Url, targetServer.info.Id)
+ fmt.Printf("allocate %d.%d %s => %s\n", volumeId, shardId, existingLocation, targetServer.info.Id)
shardIdsToCopy = append(shardIdsToCopy, shardId)
}
err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- if targetServer.info.Id != existingLocation.Url {
+ if targetServer.info.Id != existingLocation {
_, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
- SourceDataNode: existingLocation.Url,
+ SourceDataNode: existingLocation,
})
if copyErr != nil {
return copyErr
@@ -235,9 +235,9 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
return mountErr
}
- if targetServer.info.Id != existingLocation.Url {
+ if targetServer.info.Id != existingLocation {
copiedShardIds = shardIdsToCopy
- glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation.Url, volumeId, copiedShardIds)
+ glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
}
return nil
@@ -251,11 +251,11 @@ func oneServerCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.Di
}
func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation wdclient.Location, toBeDeletedShardIds []uint32) error {
+ volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
shouldDeleteEcx := len(toBeDeletedShardIds) == erasure_coding.TotalShardsCount
- return operation.WithVolumeServerClient(sourceLocation.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeDeletedShardIds,
@@ -349,7 +349,7 @@ func collectEcNodes(ctx context.Context, commandEnv *commandEnv) (ecNodes []*EcN
return
}
-func collectVolumeForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *commandEnv, selectedCollection string, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
var resp *master_pb.VolumeListResponse
err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index ff72baf33..21f26e093 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -73,7 +73,7 @@ func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding
if !found {
return false
}
- if deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted {
+ if _, deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted {
if len(ecVolume.Shards) == 0 {
delete(l.ecVolumes, vid)
}
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 8bd7237f5..44d8ca80f 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -61,7 +61,7 @@ func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
return true
}
-func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) bool {
+func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
foundPosition := -1
for i, s := range ev.Shards {
if s.ShardId == shardId {
@@ -69,11 +69,13 @@ func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) bool {
}
}
if foundPosition < 0 {
- return false
+ return nil, false
}
+ ecVolumeShard = ev.Shards[foundPosition]
+
ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
- return true
+ return ecVolumeShard, true
}
func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
@@ -99,11 +101,16 @@ func (ev *EcVolume) Destroy() {
ev.Close()
- baseFileName := EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
for _, s := range ev.Shards {
s.Destroy()
}
- os.Remove(baseFileName + ".ecx")
+ os.Remove(ev.FileName() + ".ecx")
+}
+
+func (ev *EcVolume) FileName() string {
+
+ return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
+
}
func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {