aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_copy.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-01 02:42:57 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-01 02:42:57 -0700
commitd1a4e19a3f0aeaabcf87aba3c99880f82bde87a3 (patch)
tree9c3ad654a79095cc9389d3b7220d66fe924f9a61 /weed/server/volume_grpc_copy.go
parent3bd48c4f2925a725b347dff4e4d66928591e1598 (diff)
downloadseaweedfs-d1a4e19a3f0aeaabcf87aba3c99880f82bde87a3.tar.xz
seaweedfs-d1a4e19a3f0aeaabcf87aba3c99880f82bde87a3.zip
volume: copy file also copies modification time
to ensure ttl can work well
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
-rw-r--r--weed/server/volume_grpc_copy.go49
1 files changed, 35 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 2ad77a7ff..20b81c422 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -79,17 +79,27 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}()
// println("source:", volFileInfoResp.String())
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ var modifiedTsNs int64
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName + ".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(indexBaseFileName + ".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName + ".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
os.Remove(dataBaseFileName + ".note")
@@ -129,7 +139,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -141,15 +151,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
- return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
if err != nil {
- return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
- return nil
+ return modifiedTsNs, nil
}
@@ -157,7 +167,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
only check the the differ of the file size
todo: maybe should check the received count and deleted count of the volume
*/
-func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
+func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) (error) {
stat, err := os.Stat(idxFileName)
if err != nil {
return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err)
@@ -178,7 +188,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
@@ -186,7 +196,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
- return nil
+ return modifiedTsNs, nil
}
defer dst.Close()
@@ -195,13 +205,16 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
if receiveErr == io.EOF {
break
}
+ if resp.ModifiedTsNs != 0 {
+ modifiedTsNs = resp.ModifiedTsNs
+ }
if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
- return nil
+ return modifiedTsNs, nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
@@ -271,6 +284,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
defer file.Close()
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ fileModTsNs := fileInfo.ModTime().UnixNano()
+
buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
@@ -290,12 +309,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
bytesread = int(bytesToRead)
}
err = stream.Send(&volume_server_pb.CopyFileResponse{
- FileContent: buffer[:bytesread],
+ FileContent: buffer[:bytesread],
+ ModifiedTsNs: fileModTsNs,
})
if err != nil {
// println("sending", bytesread, "bytes err", err.Error())
return err
}
+ fileModTsNs = 0 // only send once
bytesToRead -= int64(bytesread)