aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-12-22 04:54:17 +0500
committerGitHub <noreply@github.com>2022-12-21 15:54:17 -0800
commit569bc7b54df308975169151b2ced7ab5339e9c5a (patch)
tree85907258b147287bf36078eefbb192421fa6a943
parent77b7c9f7ccce5ac9e4b0cb22b6018e5758a4c380 (diff)
downloadseaweedfs-569bc7b54df308975169151b2ced7ab5339e9c5a.tar.xz
seaweedfs-569bc7b54df308975169151b2ced7ab5339e9c5a.zip
fs.verify needles status of file (#4060)
* fs.verify needles status of file * fix parse args * refactor logging * set modifyTimeAgoAtSec
-rw-r--r--weed/shell/command_fs_verify.go177
1 files changed, 177 insertions, 0 deletions
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
new file mode 100644
index 000000000..11cc64d78
--- /dev/null
+++ b/weed/shell/command_fs_verify.go
@@ -0,0 +1,177 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "io"
+ "math"
+ "strings"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsVerify{})
+}
+
+type commandFsVerify struct {
+ env *CommandEnv
+ volumeIds map[uint32][]pb.ServerAddress
+ verbose *bool
+ modifyTimeAgoAtSec int64
+ writer io.Writer
+}
+
+func (c *commandFsVerify) Name() string {
+ return "fs.verify"
+}
+
+func (c *commandFsVerify) Help() string {
+ return `recursively verify all files under a directory
+
+ fs.verify [-v] [-modifyTimeAgo 1h] /buckets/dir
+
+`
+}
+
+func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ c.env = commandEnv
+ c.writer = writer
+
+ fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files")
+ modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify")
+
+ if err = fsVerifyCommand.Parse(args); err != nil {
+ return err
+ }
+
+ path, parseErr := commandEnv.parseUrl(findInputDirectory(fsVerifyCommand.Args()))
+ if parseErr != nil {
+ return parseErr
+ }
+
+ c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds())
+
+ if err := c.collectVolumeIds(); err != nil {
+ return parseErr
+ }
+
+ fCount, eConut, terr := c.verifyTraverseBfs(path)
+
+ if terr == nil {
+ fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
+ }
+
+ return terr
+
+}
+
+func (c *commandFsVerify) collectVolumeIds() error {
+ c.volumeIds = make(map[uint32][]pb.ServerAddress)
+ topologyInfo, _, err := collectTopologyInfo(c.env, 0)
+ if err != nil {
+ return err
+ }
+ eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
+ for _, diskInfo := range nodeInfo.DiskInfos {
+ for _, vi := range diskInfo.VolumeInfos {
+ c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], pb.NewServerAddressFromDataNode(nodeInfo))
+ }
+ }
+ })
+ return nil
+}
+
+func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb.ServerAddress) error {
+ err := operation.WithVolumeServerClient(false, *volumeServer, c.env.option.GrpcDialOption,
+ func(client volume_server_pb.VolumeServerClient) error {
+ _, err := client.VolumeNeedleStatus(context.Background(),
+ &volume_server_pb.VolumeNeedleStatusRequest{
+ VolumeId: fileId.VolumeId,
+ NeedleId: fileId.FileKey})
+ return err
+ },
+ )
+ if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
+ return err
+ }
+ if *c.verbose {
+ fmt.Fprintf(c.writer, ".")
+ }
+ return nil
+}
+
+type ItemEntry struct {
+ chunks []*filer_pb.FileChunk
+ path util.FullPath
+}
+
+func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
+ timeNowAtSec := time.Now().Unix()
+ return fileCount, errCount, doTraverseBfsAndSaving(c.env, nil, path, false,
+ func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ if c.modifyTimeAgoAtSec > 0 {
+ if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
+ return nil
+ }
+ }
+ dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
+ if resolveErr != nil {
+ return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
+ }
+ dataChunks = append(dataChunks, manifestChunks...)
+ if len(dataChunks) > 0 {
+ outputChan <- &ItemEntry{
+ chunks: dataChunks,
+ path: util.NewFullPath(entry.Dir, entry.Entry.Name),
+ }
+ }
+ return nil
+ },
+ func(outputChan chan interface{}) {
+ for itemEntry := range outputChan {
+ i := itemEntry.(*ItemEntry)
+ fileMsg := fmt.Sprintf("file:%s needle status ", i.path)
+ if *c.verbose {
+ fmt.Fprintf(c.writer, fileMsg)
+ fileMsg = ""
+ }
+ for _, chunk := range i.chunks {
+ if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
+ for _, volumeServer := range volumeIds {
+ if err = c.verifyEntry(chunk.Fid, &volumeServer); err != nil {
+ fmt.Fprintf(c.writer, "%sfailed verify %d:%d: %+v\n",
+ fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
+ break
+ }
+ }
+ } else {
+ err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
+ fmt.Fprintf(c.writer, "%sfailed verify chunk %d:%d: %+v\n",
+ fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
+ break
+ }
+ }
+
+ if err != nil {
+ errCount++
+ continue
+ }
+
+ if *c.verbose {
+ fmt.Fprintf(c.writer, " verifed\n")
+ }
+ fileCount++
+ }
+ })
+
+}