diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-19 22:57:14 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-19 22:57:14 -0700 |
| commit | 856da7aae2adefe7c25f68c792f9ed03977a4a0e (patch) | |
| tree | 6a9b8dafc533d4e8f2243fb80427aee807ca6b86 /weed/server | |
| parent | 115558e5f529f119ca4ba60a45a28fc7bb8d25ae (diff) | |
| download | seaweedfs-856da7aae2adefe7c25f68c792f9ed03977a4a0e.tar.xz seaweedfs-856da7aae2adefe7c25f68c792f9ed03977a4a0e.zip | |
ec volume support deletes
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 16 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 17 |
2 files changed, 25 insertions, 8 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 89788d3c3..1ec69b43f 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil { return err } @@ -95,7 +95,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string) error { + compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error { copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -109,7 +109,7 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb. return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond)) + err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) if err != nil { return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } @@ -143,9 +143,13 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler) error { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { glog.V(4).Infof("writing to %s", fileName) - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + flags := os.O_WRONLY|os.O_CREATE|os.O_TRUNC + if isAppend { + flags = os.O_WRONLY|os.O_CREATE + } + dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { return nil } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index ab1310c4a..e676337e6 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -74,6 +74,11 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s } else { rebuiltShardIds = generatedShardIds } + + if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err) + } + break } } @@ -97,7 +102,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil { return err } } @@ -107,7 +112,12 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv } // copy ecx file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil { + return err + } + + // copy ecj file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil { return err } @@ -166,6 +176,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se if err := os.Remove(baseFilename + ".ecx"); err != nil { return nil, err } + if err := os.Remove(baseFilename + ".ecj"); err != nil { + return nil, err + } } return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil |
