aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go82
1 files changed, 56 insertions, 26 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index a54a1e343..cf9f9f777 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,10 +3,11 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
+ "io/ioutil"
"math"
"os"
- "path"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -20,17 +21,20 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
-// VolumeCopy copy the .idx .dat files, and mount the volume
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
- return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
- }
- location := vs.store.FindFreeLocation()
- if location == nil {
- return nil, fmt.Errorf("no space left")
+ glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
+
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
+ }
+
+ glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
@@ -40,10 +44,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// send .dat file
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
- var volumeFileName, idxFileName, datFileName string
+ var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
- volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
+ volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
@@ -51,33 +55,55 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
- volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ diskType := volFileInfoResp.DiskType
+ if req.DiskType != "" {
+ diskType = req.DiskType
+ }
+ location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
+ if location == nil {
+ return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
+ }
+
+ dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
+
+ ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
// println("source:", volFileInfoResp.String())
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
+ os.Remove(dataBaseFileName + ".note")
+
return nil
})
- idxFileName = volumeFileName + ".idx"
- datFileName = volumeFileName + ".dat"
+ if err != nil {
+ return nil, err
+ }
+ if dataBaseFileName == "" {
+ return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
+ }
+
+ idxFileName = indexBaseFileName + ".idx"
+ datFileName = dataBaseFileName + ".dat"
- if err != nil && volumeFileName != "" {
- if idxFileName != "" {
+ defer func() {
+ if err != nil && dataBaseFileName != "" {
os.Remove(idxFileName)
- }
- if datFileName != "" {
os.Remove(datFileName)
+ os.Remove(dataBaseFileName + ".vif")
}
- return nil, err
- }
+ }()
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
return nil, err
@@ -94,10 +120,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
- copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
+ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
Ext: ext,
CompactionRevision: compactRevision,
@@ -186,6 +211,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
+ resp.DiskType = string(v.DiskType())
return resp, nil
}
@@ -204,11 +230,15 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
- fileName = v.FileName() + req.Ext
+ fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
- tName := path.Join(location.Directory, baseFileName)
+ tName := util.Join(location.Directory, baseFileName)
+ if util.FileExists(tName) {
+ fileName = tName
+ }
+ tName = util.Join(location.IdxDirectory, baseFileName)
if util.FileExists(tName) {
fileName = tName
}