diff options
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 55 |
1 files changed, 41 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 84a9035ca..4f99c24b4 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -131,7 +131,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre nextReportTarget = processed + reportInterval } return true - }, throttler); err != nil { + }, throttler, 0); err != nil { // regular volumes use generation 0 return err } if sendErr != nil { @@ -142,14 +142,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } } - if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler, 0); err != nil { // regular volumes use generation 0 return err } if modifiedTsNs > 0 { os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) } - if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil { + if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler, 0); err != nil { // regular volumes use generation 0 return err } if modifiedTsNs > 0 { @@ -199,10 +199,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre } func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { - return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond)) + return vs.doCopyFileWithGeneration(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, 0) } -func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) { +func (vs *VolumeServer) doCopyFileWithGeneration(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, generation uint32) (modifiedTsNs int64, err error) { + return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond), generation) +} + +func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler, generation uint32) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -212,6 +216,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe Collection: collection, IsEcVolume: isEcVolume, IgnoreSourceFileNotFound: ignoreSourceFileNotFound, + Generation: generation, // pass generation to source server }) if err != nil { return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) @@ -332,22 +337,29 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v v.SyncToDisk() fileName = v.FileName(req.Ext) } else { - baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext + // Use generation-aware filename for EC volumes + generation := req.Generation for _, location := range vs.store.Locations { - tName := util.Join(location.Directory, baseFileName) + // Try data directory with generation-aware naming + baseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation) + tName := baseFileName + req.Ext if util.FileExists(tName) { fileName = tName + break } - tName = util.Join(location.IdxDirectory, baseFileName) + // Try index directory with generation-aware naming + baseFileName = erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation) + tName = baseFileName + req.Ext if util.FileExists(tName) { fileName = tName + break } } if fileName == "" { if req.IgnoreSourceFileNotFound { return nil } - return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId) + return fmt.Errorf("CopyFile not found ec volume id %d generation %d", req.VolumeId, generation) } } @@ -442,8 +454,8 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive case *volume_server_pb.ReceiveFileRequest_Info: // First message contains file info fileInfo = data.Info - glog.V(1).Infof("ReceiveFile: volume %d, ext %s, collection %s, shard %d, size %d", - fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize) + glog.V(1).Infof("ReceiveFile: volume %d, ext %s, collection %s, shard %d, size %d, generation %d", + fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize, fileInfo.Generation) // Create file path based on file info if fileInfo.IsEcVolume { @@ -465,9 +477,24 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive }) } - // Create EC shard file path - baseFileName := erasure_coding.EcShardBaseFileName(fileInfo.Collection, int(fileInfo.VolumeId)) - filePath = util.Join(targetLocation.Directory, baseFileName+fileInfo.Ext) + // Create generation-aware EC shard file path + // Use index directory for index files (.ecx, .ecj, .vif), data directory for shard files + var baseDir string + if fileInfo.Ext == ".ecx" || fileInfo.Ext == ".ecj" || fileInfo.Ext == ".vif" { + baseDir = targetLocation.IdxDirectory + } else { + baseDir = targetLocation.Directory + } + + baseFileName := erasure_coding.EcShardFileNameWithGeneration( + fileInfo.Collection, + baseDir, + int(fileInfo.VolumeId), + fileInfo.Generation, + ) + filePath = baseFileName + fileInfo.Ext + + glog.V(1).Infof("ReceiveFile: creating generation-aware EC file %s", filePath) } else { // Regular volume file v := vs.store.GetVolume(needle.VolumeId(fileInfo.VolumeId)) |
