diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-10-16 08:38:46 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-10-15 20:38:46 -0700 |
| commit | 7836f7574e65bd9b7a8b8f1beda97b21cd279da0 (patch) | |
| tree | 4b28917f4f66918d4bda7fdfa74b4f25b513a03c /weed/shell/command_volume_fsck.go | |
| parent | f476cf340321faf495afe41082fc3d511e44819f (diff) | |
| download | seaweedfs-7836f7574e65bd9b7a8b8f1beda97b21cd279da0.tar.xz seaweedfs-7836f7574e65bd9b7a8b8f1beda97b21cd279da0.zip | |
[volume.fsck] hotfix apply purging and add option verifyNeedle #3860 (#3861)
* fix apply purging and add verifyNeedle
* common readSourceNeedleBlob
* use consts
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 38 |
1 files changed, 32 insertions, 6 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 70c6cd91c..1750ae94c 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -34,6 +34,11 @@ func init() { Commands = append(Commands, &commandVolumeFsck{}) } +const ( + readbufferSize = 16 + verifyProbeBlobSize = 16 +) + type commandVolumeFsck struct { env *CommandEnv writer io.Writer @@ -44,6 +49,7 @@ type commandVolumeFsck struct { verbose *bool forcePurging *bool findMissingChunksInFiler *bool + verifyNeedle *bool } func (c *commandVolumeFsck) Name() string { @@ -82,6 +88,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") cutoffTimeAgo := fsckCommand.Duration("cutoffTimeAgo", 5*time.Minute, "only include entries on volume servers before this cutoff time to check orphan chunks") + c.verifyNeedle = fsckCommand.Bool("verifyNeedles", false, "try get head needle blob from volume server") if err = fsckCommand.Parse(args); err != nil { return nil @@ -219,7 +226,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m return nil }, func(outputChan chan interface{}) { - buffer := make([]byte, 16) + buffer := make([]byte, readbufferSize) for item := range outputChan { i := item.(*Item) if f, ok := files[i.vid]; ok { @@ -285,7 +292,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn if *c.verbose { for _, fid := range orphanFileIds { - fmt.Fprintf(c.writer, "%s\n", fid) + fmt.Fprintf(c.writer, "%s:%s\n", vinfo.collection, fid) } } isEcVolumeReplicas[volumeId] = vinfo.isEcVolume @@ -440,7 +447,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI defer fp.Close() br := bufio.NewReader(fp) - buffer := make([]byte, 16) + buffer := make([]byte, readbufferSize) var readSize int var readErr error item := &Item{vid: volumeId} @@ -452,7 +459,7 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI if readErr != nil { return readErr } - if readSize != 16 { + if readSize != readbufferSize { return fmt.Errorf("readSize mismatch") } item.fileKey = util.BytesToUint64(buffer[:8]) @@ -541,11 +548,27 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri return } + voluemAddr := pb.NewServerAddressWithGrpcPort(dataNodeId, 0) if err = c.readFilerFileIdFile(volumeId, func(nId types.NeedleId, itemPath util.FullPath) { + inUseCount++ + if *c.verifyNeedle { + if v, ok := db.Get(nId); ok && v.Size.IsValid() { + newSize := types.Size(verifyProbeBlobSize) + if v.Size > newSize { + v.Size = newSize + } + if _, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, voluemAddr, volumeId, *v); err != nil { + fmt.Fprintf(c.writer, "failed to read file %s NeedleBlob %+v: %+v", itemPath, nId, err) + if *c.forcePurging { + return + } + } + } + } + if err = db.Delete(nId); err != nil && *c.verbose { fmt.Fprintf(c.writer, "failed to nm.delete %s(%+v): %+v", itemPath, nId, err) } - inUseCount++ }); err != nil { err = fmt.Errorf("failed to readFilerFileIdFile %+v", err) return @@ -553,7 +576,10 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri var orphanFileCount uint64 if err = db.AscendingVisit(func(n needle_map.NeedleValue) error { - orphanFileIds = append(orphanFileIds, fmt.Sprintf("%s:%d,%s00000000", collection, volumeId, n.Key.String())) + if !n.Size.IsValid() { + return nil + } + orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String())) orphanFileCount++ orphanDataSize += uint64(n.Size) return nil |
