aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_erasure_coding.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go116
1 files changed, 63 insertions, 53 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index a2ba10323..da2146ccb 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -4,8 +4,11 @@ import (
"context"
"fmt"
"io"
+ "io/ioutil"
"math"
"os"
+ "path"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -13,6 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
/*
@@ -54,6 +58,30 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
+// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
+
+ baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+
+ var rebuiltShardIds []uint32
+
+ for _, location := range vs.store.Locations {
+ if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ // write .ec01 ~ .ec14 files
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ } else {
+ rebuiltShardIds = generatedShardIds
+ }
+ break
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsRebuildResponse{
+ RebuiltShardIds: rebuiltShardIds,
+ }, nil
+}
+
// VolumeEcShardsCopy copy the .ecx and some ec data slices
func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
@@ -62,22 +90,26 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
return nil, fmt.Errorf("no space left")
}
- baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId))
+ baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
- return err
- }
-
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
return err
}
}
+ if !req.CopyEcxFile {
+ return nil
+ }
+
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
+ return err
+ }
+
return nil
})
if err != nil {
@@ -87,65 +119,43 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
}
-// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
+// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
+// the shard should not be mounted before calling this.
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
- foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req)
- if err != nil {
- return nil, err
- }
-
- if !foundExistingVolume {
- err = vs.doDeleteMountedShards(ctx, req)
- }
+ baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
- return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err
-}
-
-// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
-func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) {
-
- v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
- if v == nil {
- return false, nil
+ for _, shardId := range req.ShardIds {
+ os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
}
- baseFileName := v.FileName()
+ // check whether to delete the ecx file also
+ hasEcxFile := false
+ existingShardCount := 0
- for _, shardId := range req.ShardIds {
- if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil {
- return true, err
+ for _, location := range vs.store.Locations {
+ fileInfos, err := ioutil.ReadDir(location.Directory)
+ if err != nil {
+ continue
}
- }
-
- if req.ShouldDeleteEcx {
- if err := os.Remove(baseFileName + ".ecx"); err != nil {
- return true, err
+ for _, fileInfo := range fileInfos {
+ if fileInfo.Name() == baseFilename+".ecx" {
+ hasEcxFile = true
+ continue
+ }
+ if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ existingShardCount++
+ }
}
}
- return true, nil
-}
-
-// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume
-func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) {
-
- ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
- if !found {
- return fmt.Errorf("volume %d not found", req.VolumeId)
- }
-
- for _, shardId := range req.ShardIds {
- if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found {
- shard.Destroy()
+ if hasEcxFile && existingShardCount == 0 {
+ if err := os.Remove(baseFilename + ".ecx"); err != nil {
+ return nil, err
}
}
- if len(ecv.Shards) == 0 {
- vs.store.DestroyEcVolume(needle.VolumeId(req.VolumeId))
- }
-
- return nil
+ return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {