aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-12-02 18:29:27 +0100
committerGitHub <noreply@github.com>2025-12-02 09:29:27 -0800
commitee775825bcd46911adeb811241bad56836dab05f (patch)
tree7e29e7e89670f907616b338c485f9428a73a0378
parent733ca8e6df3de57e2dcc0923fb2f166d3222921d (diff)
downloadseaweedfs-ee775825bcd46911adeb811241bad56836dab05f.tar.xz
seaweedfs-ee775825bcd46911adeb811241bad56836dab05f.zip
Parallelize read-only volume check pass for `volume.check.disk`. (#7602)
-rw-r--r--weed/shell/command_volume_check_disk.go52
-rw-r--r--weed/shell/common.go8
2 files changed, 37 insertions, 23 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index d7b015979..78b2486dd 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -40,6 +40,8 @@ type volumeCheckDisk struct {
syncDeletions bool
fixReadOnly bool
nonRepairThreshold float64
+
+ ewg *ErrorWaitGroup
}
func (c *commandVolumeCheckDisk) Name() string {
@@ -92,6 +94,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
+ maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -115,6 +118,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
syncDeletions: *syncDeletions,
fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold,
+
+ ewg: NewErrorWaitGroup(*maxParallelization),
}
// collect topology information
@@ -137,11 +142,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err
}
- if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
- return err
- }
+ vcd.checkReadOnlyVolumes(volumeReplicas)
- return nil
+ return vcd.ewg.Wait()
}
// checkWritableVolumes fixes volume replicas which are not read-only.
@@ -228,9 +231,9 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er
return nil
}
-func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
+func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) {
if !vcd.fixReadOnly {
- return nil
+ return
}
vcd.write("Pass #2 (read-only volumes)\n")
@@ -261,35 +264,38 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
skip, err := vcd.shouldSkipVolume(r, source)
if err != nil {
- vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
+ vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err)
continue
}
if skip {
continue
}
- // make volume writable...
- if err := vcd.makeVolumeWritable(vid, r); err != nil {
- return err
- }
+ vcd.ewg.Add(func() error {
+ // make volume writable...
+ if err := vcd.makeVolumeWritable(vid, r); err != nil {
+ return err
+ }
+
+ // ...fix it...
+ // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
+ if err := vcd.syncTwoReplicas(source, r, false); err != nil {
+ vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
- // ...fix it...
- // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
- if err := vcd.syncTwoReplicas(source, r, false); err != nil {
- vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
+ // ...or revert it back to read-only, if something went wrong.
+ // TODO: we should keep unchanged volumes as read-only, so we don't modify valid volumes which are full.
+ if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
+ return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
+ }
+ vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
- // ...or revert it back to read-only, if something went wrong.
- if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
- return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
+ return err
}
- vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
- return err
- }
+ return nil
+ })
}
}
-
- return nil
}
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
diff --git a/weed/shell/common.go b/weed/shell/common.go
index 43571176e..cb2df5828 100644
--- a/weed/shell/common.go
+++ b/weed/shell/common.go
@@ -2,6 +2,7 @@ package shell
import (
"errors"
+ "fmt"
"sync"
)
@@ -64,6 +65,13 @@ func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
}()
}
+// AddErrorf adds an error to an ErrorWaitGroupTask result, without queueing any goroutines.
+func (ewg *ErrorWaitGroup) AddErrorf(format string, a ...interface{}) {
+ ewg.errorsMu.Lock()
+ ewg.errors = append(ewg.errors, fmt.Errorf(format, a...))
+ ewg.errorsMu.Unlock()
+}
+
// Wait sleeps until all ErrorWaitGroupTasks are completed, then returns errors for them.
func (ewg *ErrorWaitGroup) Wait() error {
ewg.wg.Wait()