diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-03 02:26:31 -0700 |
| commit | 7e80b2b8823a9bb8bac58100a76d6a5825c94be4 (patch) | |
| tree | f406744ba5e7302157de46a59b4e5c09abff067f /weed/server/volume_grpc_erasure_coding.go | |
| parent | 55be09996d8f82e461e1c464db82707c982b2b57 (diff) | |
| download | seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.tar.xz seaweedfs-7e80b2b8823a9bb8bac58100a76d6a5825c94be4.zip | |
fix multiple bugs
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 116 |
1 files changed, 63 insertions, 53 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index a2ba10323..da2146ccb 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" "os" + "path" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" @@ -13,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) /* @@ -54,6 +58,30 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } +// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { + + baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + + var rebuiltShardIds []uint32 + + for _, location := range vs.store.Locations { + if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { + // write .ec01 ~ .ec14 files + if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) + } else { + rebuiltShardIds = generatedShardIds + } + break + } + } + + return &volume_server_pb.VolumeEcShardsRebuildResponse{ + RebuiltShardIds: rebuiltShardIds, + }, nil +} + // VolumeEcShardsCopy copy the .ecx and some ec data slices func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { @@ -62,22 +90,26 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return nil, fmt.Errorf("no space left") } - baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId)) + baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // copy ecx file - if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { - return err - } - // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { return err } } + if !req.CopyEcxFile { + return nil + } + + // copy ecx file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { + return err + } + return nil }) if err != nil { @@ -87,65 +119,43 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil } -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed +// the shard should not be mounted before calling this. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { - foundExistingVolume, err := vs.doDeleteUnmountedShards(ctx, req) - if err != nil { - return nil, err - } - - if !foundExistingVolume { - err = vs.doDeleteMountedShards(ctx, req) - } + baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) - return &volume_server_pb.VolumeEcShardsDeleteResponse{}, err -} - -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume -func (vs *VolumeServer) doDeleteUnmountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (foundVolume bool, err error) { - - v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) - if v == nil { - return false, nil + for _, shardId := range req.ShardIds { + os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) } - baseFileName := v.FileName() + // check whether to delete the ecx file also + hasEcxFile := false + existingShardCount := 0 - for _, shardId := range req.ShardIds { - if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardId))); err != nil { - return true, err + for _, location := range vs.store.Locations { + fileInfos, err := ioutil.ReadDir(location.Directory) + if err != nil { + continue } - } - - if req.ShouldDeleteEcx { - if err := os.Remove(baseFileName + ".ecx"); err != nil { - return true, err + for _, fileInfo := range fileInfos { + if fileInfo.Name() == baseFilename+".ecx" { + hasEcxFile = true + continue + } + if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") { + existingShardCount++ + } } } - return true, nil -} - -// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume -func (vs *VolumeServer) doDeleteMountedShards(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (error) { - - ecv, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) - if !found { - return fmt.Errorf("volume %d not found", req.VolumeId) - } - - for _, shardId := range req.ShardIds { - if shard, found := ecv.DeleteEcVolumeShard(erasure_coding.ShardId(shardId)); found { - shard.Destroy() + if hasEcxFile && existingShardCount == 0 { + if err := os.Remove(baseFilename + ".ecx"); err != nil { + return nil, err } } - if len(ecv.Shards) == 0 { - vs.store.DestroyEcVolume(needle.VolumeId(req.VolumeId)) - } - - return nil + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil } func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { |
