aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fsck.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
-rw-r--r--weed/shell/command_volume_fsck.go55
1 files changed, 40 insertions, 15 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index e48e53d85..cae8e22d4 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -2,7 +2,9 @@ package shell
import (
"bufio"
+ "bytes"
"context"
+ "errors"
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -11,6 +13,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@@ -72,6 +75,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
+ cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -126,7 +130,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
- err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer)
+ cutoffFrom := time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ err = c.collectOneVolumeFileIds(tempFolder, dataNodeId, volumeId, vinfo, *verbose, writer, uint64(cutoffFrom))
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@@ -351,7 +356,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
-func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
+func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeId string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer, cutoffFrom uint64) error {
if verbose {
fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@@ -377,13 +382,42 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, dataNodeI
return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
- err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, dataNodeId, volumeId))
+ var buf bytes.Buffer
+ for {
+ resp, err := copyFileClient.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ buf.Write(resp.FileContent)
+ }
+ if vinfo.isReadOnly == false {
+ index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
+ resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
+ VolumeId: volumeId,
+ NeedleId: uint64(key),
+ Offset: offset.ToActualOffset(),
+ Size: int32(size),
+ })
+ if err != nil {
+ return false, fmt.Errorf("to read needle meta with id %d from volume %d with error %v", key, volumeId, err)
+ }
+ return resp.LastModified <= cutoffFrom, nil
+ })
+ if err != nil {
+ fmt.Fprintf(writer, "Failed to search for last vilad index on volume %d with error %v", volumeId, err)
+ }
+ buf.Truncate(index * types.NeedleMapEntrySize)
+ }
+ idxFilename := getVolumeFileIdFile(tempFolder, dataNodeId, volumeId)
+ err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil
-
})
}
@@ -673,7 +707,7 @@ func getFilerFileIdFile(tempFolder string, vid uint32) string {
return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
+func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
@@ -681,15 +715,6 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
defer dst.Close()
- for {
- resp, receiveErr := client.Recv()
- if receiveErr == io.EOF {
- break
- }
- if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
- }
- dst.Write(resp.FileContent)
- }
+ dst.Write(bytes)
return nil
}