aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go86
1 files changed, 49 insertions, 37 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index ed8f49596..3d1de1a4f 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -173,24 +173,37 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
- glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+ glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
- found := false
- var indexBaseFilename, dataBaseFilename string
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
- found = true
- indexBaseFilename = path.Join(location.IdxDirectory, bName)
- dataBaseFilename = path.Join(location.Directory, bName)
- for _, shardId := range req.ShardIds {
- os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
+ if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
+ glog.Errorf("deleteEcShards from %s %s.%v: %v", location, bName, req.ShardIds, err)
+ return nil, err
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
+}
+
+func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
+
+ found := false
+
+ indexBaseFilename := path.Join(location.IdxDirectory, bName)
+ dataBaseFilename := path.Join(location.Directory, bName)
+
+ if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
+ for _, shardId := range shardIds {
+ shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
+ if util.FileExists(shardFileName) {
+ found = true
+ os.Remove(shardFileName)
}
- break
}
}
if !found {
- return nil, nil
+ return nil
}
// check whether to delete the .ecx and .ecj file also
@@ -198,45 +211,44 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
hasIdxFile := false
existingShardCount := 0
- for _, location := range vs.store.Locations {
- fileInfos, err := os.ReadDir(location.Directory)
+ fileInfos, err := os.ReadDir(location.Directory)
+ if err != nil {
+ return err
+ }
+ if location.IdxDirectory != location.Directory {
+ idxFileInfos, err := os.ReadDir(location.IdxDirectory)
if err != nil {
+ return err
+ }
+ fileInfos = append(fileInfos, idxFileInfos...)
+ }
+ for _, fileInfo := range fileInfos {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
+ hasEcxFile = true
continue
}
- if location.IdxDirectory != location.Directory {
- idxFileInfos, err := os.ReadDir(location.IdxDirectory)
- if err != nil {
- continue
- }
- fileInfos = append(fileInfos, idxFileInfos...)
+ if fileInfo.Name() == bName+".idx" {
+ hasIdxFile = true
+ continue
}
- for _, fileInfo := range fileInfos {
- if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
- hasEcxFile = true
- continue
- }
- if fileInfo.Name() == bName+".idx" {
- hasIdxFile = true
- continue
- }
- if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
- existingShardCount++
- }
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
+ existingShardCount++
}
}
if hasEcxFile && existingShardCount == 0 {
if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
- return nil, err
+ return err
}
os.Remove(indexBaseFilename + ".ecj")
- }
- if !hasIdxFile {
- // .vif is used for ec volumes and normal volumes
- os.Remove(dataBaseFilename + ".vif")
+
+ if !hasIdxFile {
+ // .vif is used for ec volumes and normal volumes
+ os.Remove(dataBaseFilename + ".vif")
+ }
}
- return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
+ return nil
}
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {