aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_check_disk.go90
1 files changed, 58 insertions, 32 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index ca7efa5d4..99d0fcf9a 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -3,6 +3,7 @@ package shell
import (
"bytes"
"context"
+ "errors"
"flag"
"fmt"
"io"
@@ -51,8 +52,17 @@ func (c *commandVolumeCheckDisk) Help() string {
find all volumes that are replicated
for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count.
for the pair volume A and B
- append entries in A and not in B to B
- append entries in B and not in A to A
+ bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A
+ uni-directional sync (read-only repair): only sync from source to target without modifying source
+
+ Options:
+ -slow: check all replicas even if file counts are the same
+ -v: verbose mode with detailed progress output
+ -volumeId: check only a specific volume ID (0 for all)
+ -apply: actually apply the fixes (default is simulation mode)
+ -force-readonly: also check and repair read-only volumes using uni-directional sync
+ -syncDeleted: sync deletion records during repair
+ -nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3)
`
}
@@ -158,7 +168,7 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
continue
}
}
- if err := vcd.syncTwoReplicas(a, b); err != nil {
+ if err := vcd.syncTwoReplicas(a, b, true); err != nil {
vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
// always choose the larger volume to be the source
@@ -178,10 +188,6 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo
return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)")
}
-func (vcd *volumeCheckDisk) isLocked() bool {
- return vcd.commandEnv.isLocked()
-}
-
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
return vcd.commandEnv.option.GrpcDialOption
}
@@ -282,62 +288,82 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error)
return true, nil
}
-func (vcd *volumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica) (err error) {
- aHasChanges, bHasChanges := true, true
+// syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode
+// is enabled, changes from target are also synced back into the source.
+func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) {
+ sourceHasChanges, targetHasChanges := true, true
const maxIterations = 5
iteration := 0
- for (aHasChanges || bHasChanges) && iteration < maxIterations {
+ for (sourceHasChanges || targetHasChanges) && iteration < maxIterations {
iteration++
- vcd.writeVerbose("sync iteration %d for volume %d\n", iteration, a.info.Id)
+ vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id)
- prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges
- if aHasChanges, bHasChanges, err = vcd.checkBoth(a, b); err != nil {
+ prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges
+ if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil {
return err
}
// Detect if we're stuck in a loop with no progress
- if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) {
+ if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
- a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration)
+ source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration)
return fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
- if iteration >= maxIterations && (aHasChanges || bHasChanges) {
+ if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) {
vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
- a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id)
+ source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
return nil
}
-func (vcd *volumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica) (aHasChanges bool, bHasChanges bool, err error) {
- aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
+// checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source.
+// Returns whether the source and/or target were modified.
+func (vcd *volumeCheckDisk) checkBoth(source, target *VolumeReplica, bidi bool) (sourceHasChanges bool, targetHasChanges bool, err error) {
+ sourceDB, targetDB := needle_map.NewMemDb(), needle_map.NewMemDb()
+ if sourceDB == nil || targetDB == nil {
+ return false, false, fmt.Errorf("failed to allocate in-memory needle DBs")
+ }
defer func() {
- aDB.Close()
- bDB.Close()
+ sourceDB.Close()
+ targetDB.Close()
}()
// read index db
- if err = vcd.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode)); err != nil {
- return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
+ if err = vcd.readIndexDatabase(sourceDB, source.info.Collection, source.info.Id, pb.NewServerAddressFromDataNode(source.location.dataNode)); err != nil {
+ return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %w", source.location.dataNode.Id, source.info.Id, err)
}
- if err := vcd.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode)); err != nil {
- return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
+ if err := vcd.readIndexDatabase(targetDB, target.info.Collection, target.info.Id, pb.NewServerAddressFromDataNode(target.location.dataNode)); err != nil {
+ return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %w", target.location.dataNode.Id, target.info.Id, err)
}
// find and make up the differences
- aHasChanges, err1 := vcd.doVolumeCheckDisk(bDB, aDB, b, a)
- bHasChanges, err2 := vcd.doVolumeCheckDisk(aDB, bDB, a, b)
- if err1 != nil {
- return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
+ var errs []error
+ targetHasChanges, errTarget := vcd.doVolumeCheckDisk(sourceDB, targetDB, source, target)
+ if errTarget != nil {
+ errs = append(errs,
+ fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w",
+ source.location.dataNode.Id, target.location.dataNode.Id, source.info.Id, errTarget))
}
- if err2 != nil {
- return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2)
+ sourceHasChanges = false
+ if bidi {
+ var errSource error
+ sourceHasChanges, errSource = vcd.doVolumeCheckDisk(targetDB, sourceDB, target, source)
+ if errSource != nil {
+ errs = append(errs,
+ fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %w",
+ target.location.dataNode.Id, source.location.dataNode.Id, target.info.Id, errSource))
+ }
+ }
+ if len(errs) > 0 {
+ return sourceHasChanges, targetHasChanges, errors.Join(errs...)
}
- return aHasChanges, bHasChanges, nil
+
+ return sourceHasChanges, targetHasChanges, nil
}
func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica) (hasChanges bool, err error) {