diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-05-20 00:53:17 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-05-20 00:53:17 -0700 |
| commit | fbbc74abb4e1fc57af6c6cd2646e753ae08e760e (patch) | |
| tree | 3247630aa327e108d6da69cb940fb90dc242ab72 /weed/server | |
| parent | ae499fd5aabcba3e66025ab1389cc863ca16ba9c (diff) | |
| download | seaweedfs-fbbc74abb4e1fc57af6c6cd2646e753ae08e760e.tar.xz seaweedfs-fbbc74abb4e1fc57af6c6cd2646e753ae08e760e.zip | |
adds VolumeEcGenerateSlices, VolumeEcCopy
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 62 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 82 |
2 files changed, 114 insertions, 30 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 6bb61dfba..7b681aa53 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "os" "time" @@ -49,37 +50,13 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo volumeFileName = storage.VolumeFileName(volFileInfoResp.Collection, location.Directory, int(req.VolumeId)) // println("source:", volFileInfoResp.String()) - - copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: req.VolumeId, - Ext: ".idx", - CompactionRevision: volFileInfoResp.CompactionRevision, - StopOffset: volFileInfoResp.IdxFileSize, - }) - if err != nil { - return fmt.Errorf("failed to start copying volume %d idx file: %v", req.VolumeId, err) - } - - idxFileName = volumeFileName + ".idx" - err = writeToFile(copyFileClient, idxFileName, util.NewWriteThrottler(vs.compactionBytePerSecond)) - if err != nil { - return fmt.Errorf("failed to copy volume %d idx file: %v", req.VolumeId, err) - } - - copyFileClient, err = client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: req.VolumeId, - Ext: ".dat", - CompactionRevision: volFileInfoResp.CompactionRevision, - StopOffset: volFileInfoResp.DatFileSize, - }) - if err != nil { - return fmt.Errorf("failed to start copying volume %d dat file: %v", req.VolumeId, err) + // copy ecx file + if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err!=nil{ + return err } - datFileName = volumeFileName + ".dat" - err = writeToFile(copyFileClient, datFileName, util.NewWriteThrottler(vs.compactionBytePerSecond)) - if err != nil { - return fmt.Errorf("failed to copy volume %d dat file: %v", req.VolumeId, err) + if err:=vs.doCopyFile(ctx, client, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err!=nil{ + return err } return nil @@ -109,6 +86,28 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } +func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, vid uint32, + compactRevision uint32, stopOffset uint64, baseFileName, ext string) error { + + copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: vid, + Ext: ext, + CompactionRevision: compactRevision, + StopOffset: stopOffset, + }) + if err != nil { + return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) + } + + err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond)) + if err != nil { + return fmt.Errorf("failed to copy volume %d %s file: %v", vid, ext, err) + } + + return nil + +} + /** only check the the differ of the file size todo: maybe should check the received count and deleted count of the volume @@ -175,6 +174,9 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se return resp, nil } +// CopyFile client pulls the volume related file from the source server. +// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected +// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) @@ -182,7 +184,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return fmt.Errorf("not found volume id %d", req.VolumeId) } - if uint32(v.CompactionRevision) != req.CompactionRevision { + if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { return fmt.Errorf("volume %d is compacted", req.VolumeId) } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go new file mode 100644 index 000000000..beda234f7 --- /dev/null +++ b/weed/server/volume_grpc_erasure_coding.go @@ -0,0 +1,82 @@ +package weed_server + +import ( + "context" + "fmt" + "math" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +/* + +Steps to apply erasure coding to .dat .idx files +0. ensure the volume is readonly +1. client call VolumeEcGenerateSlices to generate the .ecx and .ec01~.ec14 files +2. client ask master for possible servers to hold the ec files, at least 4 servers +3. client call VolumeEcCopy on above target servers to copy ec files from the source server +4. target servers report the new ec files to the master +5. master stores vid -> [14]*DataNode +6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files + + */ + +// VolumeEcGenerateSlices generates the .ecx and .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcGenerateSlices(ctx context.Context, req *volume_server_pb.VolumeEcGenerateSlicesRequest) (*volume_server_pb.VolumeEcGenerateSlicesResponse, error) { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("volume %d not found", req.VolumeId) + } + baseFileName := v.FileName() + + // write .ecx file + if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil { + return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err) + } + + // write .ec01 ~ .ec14 files + if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + } + + + return &volume_server_pb.VolumeEcGenerateSlicesResponse{}, nil +} + +// VolumeEcCopy copy the .ecx and some ec data slices +func (vs *VolumeServer) VolumeEcCopy(ctx context.Context, req *volume_server_pb.VolumeEcCopyRequest) (*volume_server_pb.VolumeEcCopyResponse, error) { + + location := vs.store.FindFreeLocation() + if location == nil { + return nil, fmt.Errorf("no space left") + } + + baseFileName := storage.VolumeFileName(req.Collection, location.Directory, int(req.VolumeId)) + + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + // copy ecx file + if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, ".ecx"); err!=nil{ + return err + } + + // copy ec data slices + for _, ecIndex := range req.EcIndexes { + if err:=vs.doCopyFile(ctx, client, req.VolumeId, math.MaxUint32, math.MaxUint64, baseFileName, erasure_coding.ToExt(int(ecIndex))); err!=nil{ + return err + } + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("VolumeEcCopy volume %d: %v", req.VolumeId, err) + } + + return &volume_server_pb.VolumeEcCopyResponse{}, nil +} |
