aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go45
1 files changed, 26 insertions, 19 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 711a3ebad..5cc75e74c 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -20,7 +20,7 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
-// VolumeCopy copy the .idx .dat files, and mount the volume
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -43,7 +43,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
var volumeFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
- volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
+ volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
@@ -55,11 +55,15 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
return err
}
@@ -70,12 +74,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
datFileName = volumeFileName + ".dat"
if err != nil && volumeFileName != "" {
- if idxFileName != "" {
- os.Remove(idxFileName)
- }
- if datFileName != "" {
- os.Remove(datFileName)
- }
+ os.Remove(idxFileName)
+ os.Remove(datFileName)
+ os.Remove(volumeFileName + ".vif")
return nil, err
}
@@ -94,16 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
- copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: vid,
- Ext: ext,
- CompactionRevision: compactRevision,
- StopOffset: stopOffset,
- Collection: collection,
- IsEcVolume: isEcVolume,
+ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -213,6 +214,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
}
if fileName == "" {
+ if req.IgnoreSourceFileNotFound {
+ return nil
+ }
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
@@ -221,6 +225,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
file, err := os.Open(fileName)
if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
return err
}
defer file.Close()