aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fsck.go
diff options
context:
space:
mode:
authorNyaMisty <gyc990326@gmail.com>2024-06-03 05:25:42 +0800
committerGitHub <noreply@github.com>2024-06-02 14:25:42 -0700
commit579ebbdf602c13c839f6e933163cf16f81d25bfb (patch)
treec30d4a92ee8b0bca6439775af6999f69fe898748 /weed/shell/command_volume_fsck.go
parent0c62d591e2081cc198110ee557acab773a034236 (diff)
downloadseaweedfs-579ebbdf602c13c839f6e933163cf16f81d25bfb.tar.xz
seaweedfs-579ebbdf602c13c839f6e933163cf16f81d25bfb.zip
Support concurrent volume.fsck & support disabling -cutoffTimeAgo to improve speed (#5636)
Diffstat (limited to 'weed/shell/command_volume_fsck.go')
-rw-r--r--weed/shell/command_volume_fsck.go51
1 files changed, 33 insertions, 18 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 8916e90bd..d85a9e13f 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "golang.org/x/sync/errgroup"
"io"
"math"
"net/http"
@@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("read filer buckets path: %v", err)
}
- collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ var collectCutoffFromAtNs int64 = 0
+ if cutoffTimeAgo.Seconds() != 0 {
+ collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ }
var collectModifyFromAtNs int64 = 0
if modifyTimeAgo.Seconds() != 0 {
collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
}
// collect each volume file ids
- for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
- for volumeId, vinfo := range volumeIdToVInfo {
- if len(c.volumeIds) > 0 {
- if _, ok := c.volumeIds[volumeId]; !ok {
+ eg, gCtx := errgroup.WithContext(context.Background())
+ _ = gCtx
+ for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
+ dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
+ eg.Go(func() error {
+ for volumeId, vinfo := range volumeIdToVInfo {
+ if len(c.volumeIds) > 0 {
+ if _, ok := c.volumeIds[volumeId]; !ok {
+ delete(volumeIdToVInfo, volumeId)
+ continue
+ }
+ }
+ if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId)
continue
}
+ err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
+ if err != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ }
}
- if *c.collection != "" && vinfo.collection != *c.collection {
- delete(volumeIdToVInfo, volumeId)
- continue
- }
- err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
- if err != nil {
- return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ if *c.verbose {
+ fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
}
- }
- if *c.verbose {
- fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
- }
+ return nil
+ })
+ }
+ err = eg.Wait()
+ if err != nil {
+ fmt.Fprintf(c.writer, "got error: %v", err)
+ return err
}
if *c.findMissingChunksInFiler {
@@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
}
buf.Write(resp.FileContent)
}
- if !vinfo.isReadOnly {
+ if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
index, err := idx.FirstInvalidIndex(buf.Bytes(),
func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
@@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
if err != nil {
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
}
- if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) {
+ if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
return true, nil
}
return false, nil