diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-01 02:42:57 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-01 02:42:57 -0700 |
| commit | d1a4e19a3f0aeaabcf87aba3c99880f82bde87a3 (patch) | |
| tree | 9c3ad654a79095cc9389d3b7220d66fe924f9a61 /weed/server/volume_grpc_copy.go | |
| parent | 3bd48c4f2925a725b347dff4e4d66928591e1598 (diff) | |
| download | seaweedfs-d1a4e19a3f0aeaabcf87aba3c99880f82bde87a3.tar.xz seaweedfs-d1a4e19a3f0aeaabcf87aba3c99880f82bde87a3.zip | |
volume: copy file also copies modification time
to ensure ttl can work well
Diffstat (limited to 'weed/server/volume_grpc_copy.go')
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 49 |
1 files changed, 35 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2ad77a7ff..20b81c422 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -79,17 +79,27 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }() // println("source:", volFileInfoResp.String()) - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { + var modifiedTsNs int64 + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName + ".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(indexBaseFileName + ".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName + ".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } os.Remove(dataBaseFileName + ".note") @@ -129,7 +139,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } -func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -141,15 +151,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i IgnoreSourceFileNotFound: ignoreSourceFileNotFound, }) if err != nil { - return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) + return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) if err != nil { - return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) + return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } - return nil + return modifiedTsNs, nil } @@ -157,7 +167,7 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i only check the the differ of the file size todo: maybe should check the received count and deleted count of the volume */ -func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { +func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) (error) { stat, err := os.Stat(idxFileName) if err != nil { return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err) @@ -178,7 +188,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if isAppend { @@ -186,7 +196,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { - return nil + return modifiedTsNs, nil } defer dst.Close() @@ -195,13 +205,16 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s if receiveErr == io.EOF { break } + if resp.ModifiedTsNs != 0 { + modifiedTsNs = resp.ModifiedTsNs + } if receiveErr != nil { - return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr) } dst.Write(resp.FileContent) wt.MaybeSlowdown(int64(len(resp.FileContent))) } - return nil + return modifiedTsNs, nil } func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { @@ -271,6 +284,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + return err + } + fileModTsNs := fileInfo.ModTime().UnixNano() + buffer := make([]byte, BufferSizeLimit) for bytesToRead > 0 { @@ -290,12 +309,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v bytesread = int(bytesToRead) } err = stream.Send(&volume_server_pb.CopyFileResponse{ - FileContent: buffer[:bytesread], + FileContent: buffer[:bytesread], + ModifiedTsNs: fileModTsNs, }) if err != nil { // println("sending", bytesread, "bytes err", err.Error()) return err } + fileModTsNs = 0 // only send once bytesToRead -= int64(bytesread) |
