aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_fs_verify.go
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-06-19 18:50:55 +0500
committerGitHub <noreply@github.com>2024-06-19 06:50:55 -0700
commit7988ee08057afd1da73dc8981895e7930b55ec60 (patch)
treeba4025b559d6ad99e3a5af2a9d7eac2d0922cf51 /weed/shell/command_fs_verify.go
parentece018b1a94c80cf12696fb51c7f0ce7edad5dd6 (diff)
downloadseaweedfs-7988ee08057afd1da73dc8981895e7930b55ec60.tar.xz
seaweedfs-7988ee08057afd1da73dc8981895e7930b55ec60.zip
[fs.verify] skip failed files if entry not found on filerStore (#5693)
Diffstat (limited to 'weed/shell/command_fs_verify.go')
-rw-r--r--weed/shell/command_fs_verify.go75
1 files changed, 54 insertions, 21 deletions
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
index 1337ae090..b30020c40 100644
--- a/weed/shell/command_fs_verify.go
+++ b/weed/shell/command_fs_verify.go
@@ -1,6 +1,7 @@
package shell
import (
+ "bytes"
"context"
"flag"
"fmt"
@@ -89,17 +90,18 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
defer close(c.waitChan[volumeServerStr])
}
}
- var fCount, eConut uint64
+ var fCount, eCount uint64
if *c.metadataFromLog {
- itemErrCount := atomic.NewUint64(0)
var wg sync.WaitGroup
- fCount, err = c.verifyProcessMetadata(path, itemErrCount, &wg)
+ fCount, eCount, err = c.verifyProcessMetadata(path, &wg)
wg.Wait()
- eConut = itemErrCount.Load()
+ if err != nil {
+ return err
+ }
} else {
- fCount, eConut, err = c.verifyTraverseBfs(path)
+ fCount, eCount, err = c.verifyTraverseBfs(path)
}
- fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
+ fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eCount)
return err
}
@@ -143,19 +145,44 @@ type ItemEntry struct {
path util.FullPath
}
-func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.Uint64, wg *sync.WaitGroup) (fileCount uint64, err error) {
+func (c *commandFsVerify) verifyProcessMetadata(path string, wg *sync.WaitGroup) (fileCount uint64, errCount uint64, err error) {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
- if resp.EventNotification.NewEntry == nil || len(message.NewEntry.Chunks) == 0 {
+ if resp.EventNotification.NewEntry == nil {
+ return nil
+ }
+ chunkCount := len(message.NewEntry.Chunks)
+ if chunkCount == 0 {
return nil
}
entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name)
- if c.verifyEntry(entryPath, message.NewEntry.Chunks, errorCount, wg) {
- if *c.verbose {
- fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, len(message.NewEntry.Chunks))
+ errorChunksCount := atomic.NewUint64(0)
+ if !c.verifyEntry(entryPath, message.NewEntry.Chunks, errorChunksCount, wg) {
+ if err = c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ entryResp, errReq := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: message.NewParentPath,
+ Name: message.NewEntry.Name,
+ })
+ if strings.HasSuffix(errReq.Error(), "no entry is found in filer store") {
+ return nil
+ } else if errReq != nil {
+ return errReq
+ }
+ if entryResp.Entry.Attributes.Mtime == message.NewEntry.Attributes.Mtime &&
+ bytes.Equal(entryResp.Entry.Attributes.Md5, message.NewEntry.Attributes.Md5) {
+ fmt.Fprintf(c.writer, "file: %s needles:%d failed:%d\n", entryPath, chunkCount, errorChunksCount.Load())
+ errCount++
+ }
+ return nil
+ }); err != nil {
+ return err
}
- fileCount++
+ return nil
+ }
+ if *c.verbose {
+ fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, chunkCount)
}
+ fileCount++
return nil
}
metadataFollowOption := &pb.MetadataFollowOption{
@@ -168,9 +195,9 @@ func (c *commandFsVerify) verifyProcessMetadata(path string, errorCount *atomic.
DirectoriesToWatch: nil,
StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(),
StopTsNs: time.Now().UnixNano(),
- EventErrorType: pb.TrivialOnError,
+ EventErrorType: pb.DontLogError,
}
- return fileCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
+ return fileCount, errCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
}
func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool {
@@ -181,8 +208,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
for _, volumeServer := range volumeIds {
if *c.concurrency == 0 {
if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil {
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- fileMsg, chunk.GetFileIdString(), err)
+ if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ fileMsg, chunk.GetFileIdString(), err)
+ }
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
errorCount.Add(1)
@@ -207,8 +236,10 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
defer wg.Done()
if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil {
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- msg, fChunk.GetFileIdString(), err)
+ if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ msg, fChunk.GetFileIdString(), err)
+ }
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
errorCount.Add(1)
@@ -218,9 +249,11 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
}(chunk, path, volumeServer, fileMsg)
}
} else {
- err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- fileMsg, chunk.GetFileIdString(), err)
+ if !*c.metadataFromLog {
+ err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ fileMsg, chunk.GetFileIdString(), err)
+ }
if itemIsVerifed.Load() {
itemIsVerifed.Store(false)
errorCount.Add(1)