aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fsck.go
diff options
context:
space:
mode:
authorwyang <wings.wyang@gmail.com>2024-07-31 16:12:57 +0800
committerGitHub <noreply@github.com>2024-07-31 01:12:57 -0700
commit31b89c10627e1c2ac4dd56070df3152c40d75484 (patch)
treea8860f57423026a5b61186ba4af1faa4d2f2f277 /weed/shell/command_volume_fsck.go
parentb2ffcdaab2e662d85913eee7fa11eddb57d3052e (diff)
downloadseaweedfs-31b89c10627e1c2ac4dd56070df3152c40d75484.tar.xz
seaweedfs-31b89c10627e1c2ac4dd56070df3152c40d75484.zip
fsck: only check the appendNs of deleted needle (#5841)
increase fsck speed Co-authored-by: Yang Wang <yangwang@weride.ai>
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
-rw-r--r--weed/shell/command_volume_fsck.go88
1 files changed, 43 insertions, 45 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index dd58175cf..1a70d6d7c 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -7,19 +7,6 @@ import (
"errors"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage"
- "github.com/seaweedfs/seaweedfs/weed/storage/idx"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "golang.org/x/sync/errgroup"
"io"
"math"
"net/http"
@@ -31,7 +18,20 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "golang.org/x/sync/errgroup"
)
func init() {
@@ -164,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
- err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
+ err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo)
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@@ -199,7 +199,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("failed to collect file ids from filer: %v", err)
}
// volume file ids subtract filer file ids
- if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging); err != nil {
+ if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
}
}
@@ -289,7 +289,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
return nil
}
-func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool) error {
+func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
@@ -299,7 +299,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
serverReplicas := make(map[uint32][]pb.ServerAddress)
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo)
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
@@ -395,7 +395,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
-func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error {
+func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error {
if *c.verbose {
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@@ -432,29 +432,6 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
}
buf.Write(resp.FileContent)
}
- if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
- index, err := idx.FirstInvalidIndex(buf.Bytes(),
- func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
- resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
- VolumeId: volumeId,
- NeedleId: uint64(key),
- Offset: offset.ToActualOffset(),
- Size: int32(size),
- })
- if err != nil {
- return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
- }
- if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
- return true, nil
- }
- return false, nil
- })
- if err != nil {
- fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err)
- } else {
- buf.Truncate(index * types.NeedleMapEntrySize)
- }
- }
idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
@@ -570,7 +547,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
}
}
-func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
+func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
volumeFileIdDb := needle_map.NewMemDb()
defer volumeFileIdDb.Close()
@@ -610,9 +587,30 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
if n.Size.IsDeleted() {
return nil
}
- orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
- orphanFileCount++
- orphanDataSize += uint64(n.Size)
+ if cutoffFrom > 0 || modifyFrom > 0 {
+ return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
+ func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
+ VolumeId: volumeId,
+ NeedleId: types.NeedleIdToUint64(n.Key),
+ Offset: n.Offset.ToActualOffset(),
+ Size: int32(n.Size),
+ })
+ if err != nil {
+ return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err)
+ }
+ if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
+ orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ }
+ return nil
+ })
+ } else {
+ orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ }
return nil
}); err != nil {
err = fmt.Errorf("failed to AscendingVisit %+v", err)