aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go55
1 files changed, 41 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 84a9035ca..4f99c24b4 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -131,7 +131,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
nextReportTarget = processed + reportInterval
}
return true
- }, throttler); err != nil {
+ }, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if sendErr != nil {
@@ -142,14 +142,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
}
- if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler); err != nil {
+ if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if modifiedTsNs > 0 {
os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
}
- if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler); err != nil {
+ if modifiedTsNs, err = vs.doCopyFileWithThrottler(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, 1024*1024, dataBaseFileName, ".vif", false, true, nil, throttler, 0); err != nil { // regular volumes use generation 0
return err
}
if modifiedTsNs > 0 {
@@ -199,10 +199,14 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
- return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
+ return vs.doCopyFileWithGeneration(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, 0)
}
-func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {
+func (vs *VolumeServer) doCopyFileWithGeneration(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, generation uint32) (modifiedTsNs int64, err error) {
+ return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond), generation)
+}
+
+func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler, generation uint32) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -212,6 +216,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe
Collection: collection,
IsEcVolume: isEcVolume,
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
+ Generation: generation, // pass generation to source server
})
if err != nil {
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -332,22 +337,29 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
v.SyncToDisk()
fileName = v.FileName(req.Ext)
} else {
- baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
+ // Use generation-aware filename for EC volumes
+ generation := req.Generation
for _, location := range vs.store.Locations {
- tName := util.Join(location.Directory, baseFileName)
+ // Try data directory with generation-aware naming
+ baseFileName := erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.Directory, int(req.VolumeId), generation)
+ tName := baseFileName + req.Ext
if util.FileExists(tName) {
fileName = tName
+ break
}
- tName = util.Join(location.IdxDirectory, baseFileName)
+ // Try index directory with generation-aware naming
+ baseFileName = erasure_coding.EcShardFileNameWithGeneration(req.Collection, location.IdxDirectory, int(req.VolumeId), generation)
+ tName = baseFileName + req.Ext
if util.FileExists(tName) {
fileName = tName
+ break
}
}
if fileName == "" {
if req.IgnoreSourceFileNotFound {
return nil
}
- return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
+ return fmt.Errorf("CopyFile not found ec volume id %d generation %d", req.VolumeId, generation)
}
}
@@ -442,8 +454,8 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive
case *volume_server_pb.ReceiveFileRequest_Info:
// First message contains file info
fileInfo = data.Info
- glog.V(1).Infof("ReceiveFile: volume %d, ext %s, collection %s, shard %d, size %d",
- fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize)
+ glog.V(1).Infof("ReceiveFile: volume %d, ext %s, collection %s, shard %d, size %d, generation %d",
+ fileInfo.VolumeId, fileInfo.Ext, fileInfo.Collection, fileInfo.ShardId, fileInfo.FileSize, fileInfo.Generation)
// Create file path based on file info
if fileInfo.IsEcVolume {
@@ -465,9 +477,24 @@ func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_Receive
})
}
- // Create EC shard file path
- baseFileName := erasure_coding.EcShardBaseFileName(fileInfo.Collection, int(fileInfo.VolumeId))
- filePath = util.Join(targetLocation.Directory, baseFileName+fileInfo.Ext)
+ // Create generation-aware EC shard file path
+ // Use index directory for index files (.ecx, .ecj, .vif), data directory for shard files
+ var baseDir string
+ if fileInfo.Ext == ".ecx" || fileInfo.Ext == ".ecj" || fileInfo.Ext == ".vif" {
+ baseDir = targetLocation.IdxDirectory
+ } else {
+ baseDir = targetLocation.Directory
+ }
+
+ baseFileName := erasure_coding.EcShardFileNameWithGeneration(
+ fileInfo.Collection,
+ baseDir,
+ int(fileInfo.VolumeId),
+ fileInfo.Generation,
+ )
+ filePath = baseFileName + fileInfo.Ext
+
+ glog.V(1).Infof("ReceiveFile: creating generation-aware EC file %s", filePath)
} else {
// Regular volume file
v := vs.store.GetVolume(needle.VolumeId(fileInfo.VolumeId))