diff options
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 82 |
1 files changed, 56 insertions, 26 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index a54a1e343..cf9f9f777 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,10 +3,11 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" + "io/ioutil" "math" "os" - "path" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -20,17 +21,20 @@ import ( const BufferSizeLimit = 1024 * 1024 * 2 -// VolumeCopy copy the .idx .dat files, and mount the volume +// VolumeCopy copy the .idx .dat .vif files, and mount the volume func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { - return nil, fmt.Errorf("volume %d already exists", req.VolumeId) - } - location := vs.store.FindFreeLocation() - if location == nil { - return nil, fmt.Errorf("no space left") + glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId) + + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) + } + + glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) } // the master will not start compaction for read-only volumes, so it is safe to just copy files directly @@ -40,10 +44,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // send .dat file // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse - var volumeFileName, idxFileName, datFileName string + var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error - volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ VolumeId: req.VolumeId, }) @@ -51,33 +55,55 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return fmt.Errorf("read volume file status failed, %v", err) } - volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + diskType := volFileInfoResp.DiskType + if req.DiskType != "" { + diskType = req.DiskType + } + location := vs.store.FindFreeLocation(types.ToDiskType(diskType)) + if location == nil { + return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString()) + } + + dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId)) + + ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) // println("source:", volFileInfoResp.String()) - // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { + return err + } + + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { return err } + os.Remove(dataBaseFileName + ".note") + return nil }) - idxFileName = volumeFileName + ".idx" - datFileName = volumeFileName + ".dat" + if err != nil { + return nil, err + } + if dataBaseFileName == "" { + return nil, fmt.Errorf("not found volume %d file", req.VolumeId) + } + + idxFileName = indexBaseFileName + ".idx" + datFileName = dataBaseFileName + ".dat" - if err != nil && volumeFileName != "" { - if idxFileName != "" { + defer func() { + if err != nil && dataBaseFileName != "" { os.Remove(idxFileName) - } - if datFileName != "" { os.Remove(datFileName) + os.Remove(dataBaseFileName + ".vif") } - return nil, err - } + }() if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 return nil, err @@ -94,10 +120,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } -func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { - copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, Ext: ext, CompactionRevision: compactRevision, @@ -186,6 +211,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se resp.FileCount = v.FileCount() resp.CompactionRevision = uint32(v.CompactionRevision) resp.Collection = v.Collection + resp.DiskType = string(v.DiskType()) return resp, nil } @@ -204,11 +230,15 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { return fmt.Errorf("volume %d is compacted", req.VolumeId) } - fileName = v.FileName() + req.Ext + fileName = v.FileName(req.Ext) } else { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext for _, location := range vs.store.Locations { - tName := path.Join(location.Directory, baseFileName) + tName := util.Join(location.Directory, baseFileName) + if util.FileExists(tName) { + fileName = tName + } + tName = util.Join(location.IdxDirectory, baseFileName) if util.FileExists(tName) { fileName = tName } |
