diff options
| author | Lisandro Pin <lisandro.pin@proton.ch> | 2025-12-02 18:29:27 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-02 09:29:27 -0800 |
| commit | ee775825bcd46911adeb811241bad56836dab05f (patch) | |
| tree | 7e29e7e89670f907616b338c485f9428a73a0378 /weed/shell/command_volume_check_disk.go | |
| parent | 733ca8e6df3de57e2dcc0923fb2f166d3222921d (diff) | |
| download | seaweedfs-ee775825bcd46911adeb811241bad56836dab05f.tar.xz seaweedfs-ee775825bcd46911adeb811241bad56836dab05f.zip | |
Parallelize read-only volume check pass for `volume.check.disk`. (#7602)
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index d7b015979..78b2486dd 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -40,6 +40,8 @@ type volumeCheckDisk struct { syncDeletions bool fixReadOnly bool nonRepairThreshold float64 + + ewg *ErrorWaitGroup } func (c *commandVolumeCheckDisk) Name() string { @@ -92,6 +94,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") + maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -115,6 +118,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write syncDeletions: *syncDeletions, fixReadOnly: *fixReadOnly, nonRepairThreshold: *nonRepairThreshold, + + ewg: NewErrorWaitGroup(*maxParallelization), } // collect topology information @@ -137,11 +142,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { return err } - if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { - return err - } + vcd.checkReadOnlyVolumes(volumeReplicas) - return nil + return vcd.ewg.Wait() } // checkWritableVolumes fixes volume replicas which are not read-only. @@ -228,9 +231,9 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er return nil } -func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { +func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) { if !vcd.fixReadOnly { - return nil + return } vcd.write("Pass #2 (read-only volumes)\n") @@ -261,35 +264,38 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo skip, err := vcd.shouldSkipVolume(r, source) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err) + vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err) continue } if skip { continue } - // make volume writable... - if err := vcd.makeVolumeWritable(vid, r); err != nil { - return err - } + vcd.ewg.Add(func() error { + // make volume writable... + if err := vcd.makeVolumeWritable(vid, r); err != nil { + return err + } + + // ...fix it... + // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. + if err := vcd.syncTwoReplicas(source, r, false); err != nil { + vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) - // ...fix it... - // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. - if err := vcd.syncTwoReplicas(source, r, false); err != nil { - vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) + // ...or revert it back to read-only, if something went wrong. + // TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full. + if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { + return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + } + vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - // ...or revert it back to read-only, if something went wrong. - if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { - return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + return err } - vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - return err - } + return nil + }) } } - - return nil } func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { |
