aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeyed Mahdi Sadegh Shobeiri <36403983+SmsS4@users.noreply.github.com>2023-12-23 23:47:30 +0330
committerGitHub <noreply@github.com>2023-12-23 12:17:30 -0800
commit97236389e8a7e72680c4508cdc1db3746dfd8c76 (patch)
tree52ec7d216771f72abf83c953b533c60884c2b8df
parent54ba2c886845f567e72f317053ec1b49ad3d72a1 (diff)
downloadseaweedfs-97236389e8a7e72680c4508cdc1db3746dfd8c76.tar.xz
seaweedfs-97236389e8a7e72680c4508cdc1db3746dfd8c76.zip
Add modifyTimeAgo to volume.fsck (#5133)
* Add modifyTimeAgo to volume.fsck * Fix AppendAtNs
-rw-r--r--weed/shell/command_volume_fsck.go23
1 files changed, 17 insertions, 6 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 7a3932aa1..8916e90bd 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -89,6 +89,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
+ modifyTimeAgo := fsckCommand.Duration("modifyTimeAgo", 0, "only include entries after this modify time to check orphan chunks")
c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "check needles status from volume server")
if err = fsckCommand.Parse(args); err != nil {
@@ -137,6 +138,10 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
}
collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ var collectModifyFromAtNs int64 = 0
+ if modifyTimeAgo.Seconds() != 0 {
+ collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
+ }
// collect each volume file ids
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
@@ -150,7 +155,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
- err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectCutoffFromAtNs))
+ err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@@ -163,7 +168,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
if *c.findMissingChunksInFiler {
// collect all filer file ids and paths
- if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectCutoffFromAtNs); err != nil {
+ if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, *purgeAbsent, collectModifyFromAtNs, collectCutoffFromAtNs); err != nil {
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
}
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
@@ -174,7 +179,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
}
} else {
// collect all filer file ids
- if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0); err != nil {
+ if err = c.collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo, false, 0, 0); err != nil {
return fmt.Errorf("failed to collect file ids from filer: %v", err)
}
// volume file ids subtract filer file ids
@@ -186,7 +191,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
-func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, cutoffFromAtNs int64) error {
+func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, purgeAbsent bool, collectModifyFromAtNs int64, cutoffFromAtNs int64) error {
if *c.verbose {
fmt.Fprintf(c.writer, "checking each file from filer path %s...\n", c.getCollectFilerFilePath())
}
@@ -224,6 +229,9 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
if cutoffFromAtNs != 0 && chunk.ModifiedTsNs > cutoffFromAtNs {
continue
}
+ if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs {
+ continue
+ }
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,
@@ -371,7 +379,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
-func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, cutoffFrom uint64) error {
+func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error {
if *c.verbose {
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@@ -420,7 +428,10 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
if err != nil {
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
}
- return resp.AppendAtNs <= cutoffFrom, nil
+ if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) {
+ return true, nil
+ }
+ return false, nil
})
if err != nil {
fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err)