diff options
Diffstat (limited to 'weed/server/volume_grpc_erasure_coding.go')
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 54 |
1 files changed, 41 insertions, 13 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index beda234f7..656d1eec9 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -16,17 +17,17 @@ import ( Steps to apply erasure coding to .dat .idx files 0. ensure the volume is readonly -1. client call VolumeEcGenerateSlices to generate the .ecx and .ec01~.ec14 files +1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files 2. client ask master for possible servers to hold the ec files, at least 4 servers -3. client call VolumeEcCopy on above target servers to copy ec files from the source server +3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server 4. target servers report the new ec files to the master 5. master stores vid -> [14]*DataNode 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files - */ +*/ -// VolumeEcGenerateSlices generates the .ecx and .ec01 ~ .ec14 files -func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_server_pb.VolumeEcGenerateSlicesRequest) (*volume_server_pb.VolumeEcGenerateSlicesResponse, error) { +// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { @@ -34,6 +35,10 @@ func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_ } baseFileName := v.FileName() + if v.Collection != req.Collection { + return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + // write .ecx file if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil { return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err) @@ -44,12 +49,11 @@ func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } - - return &volume_server_pb.VolumeEcGenerateSlicesResponse{}, nil + return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } -// VolumeEcCopy copy the .ecx and some ec data slices -func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.VolumeEcCopyRequest) (*volume_server_pb.VolumeEcCopyResponse, error) { +// VolumeEcShardsCopy copy the .ecx and some ec data slices +func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { location := vs.store.FindFreeLocation() if location == nil { @@ -61,13 +65,13 @@ func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ecx file - if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, ".ecx"); err!=nil{ + if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { return err } // copy ec data slices for _, ecIndex := range req.EcIndexes { - if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err!=nil{ + if err := vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err != nil { return err } } @@ -75,8 +79,32 @@ func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb. return nil }) if err != nil { - return nil, fmt.Errorf("VolumeEcCopy volume %d: %v", req.VolumeId, err) + return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err) + } + + return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil +} + +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed, assuming current server has the source volume +func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("volume %d not found", req.VolumeId) + } + baseFileName := v.FileName() + + for _, shardIndex := range req.EcIndexes { + if err := os.Remove(baseFileName + erasure_coding.ToExt(int(shardIndex))); err != nil { + return nil, err + } + } + + if req.ShouldDeleteEcx { + if err := os.Remove(baseFileName + ".ecx"); err != nil { + return nil, err + } } - return &volume_server_pb.VolumeEcCopyResponse{}, nil + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil } |
