aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_check_disk.go167
1 files changed, 137 insertions, 30 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 99d0fcf9a..054c0cb67 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -37,6 +37,7 @@ type volumeCheckDisk struct {
verbose bool
applyChanges bool
syncDeletions bool
+ fixReadOnly bool
nonRepairThreshold float64
}
@@ -48,19 +49,27 @@ func (c *commandVolumeCheckDisk) Help() string {
return `check all replicated volumes to find and fix inconsistencies. It is optional and resource intensive.
How it works:
-
+
find all volumes that are replicated
- for each volume id, if there are more than 2 replicas, find one pair with the largest 2 in file count.
- for the pair volume A and B
- bi-directional sync (default): append entries in A and not in B to B, and entries in B and not in A to A
- uni-directional sync (read-only repair): only sync from source to target without modifying source
+
+ for each writable volume ID, if there are more than 2 replicas, find one pair with the largest 2 in file count
+ for the pair volume A and B
+ append entries in A and not in B to B
+ append entries in B and not in A to A
+
+ optionally, for each non-writable volume replica A
+ if volume is not full
+ prune late volume entries not matching its index file
+ select a writable volume replica B
+ append missing entries from B into A
+ mark the volume as writable (healthy)
Options:
-slow: check all replicas even if file counts are the same
-v: verbose mode with detailed progress output
-volumeId: check only a specific volume ID (0 for all)
-apply: actually apply the fixes (default is simulation mode)
- -force-readonly: also check and repair read-only volumes using uni-directional sync
+ -fixReadOnly: also check and repair read-only volumes using uni-directional sync
-syncDeleted: sync deletion records during repair
-nonRepairThreshold: maximum fraction of missing keys allowed for repair (default 0.3)
@@ -80,7 +89,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
applyChanges := fsckCommand.Bool("apply", false, "apply the fix")
// TODO: remove this alias
applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)")
- forceReadonly := fsckCommand.Bool("force-readonly", false, "apply the fix even on readonly volumes")
+ fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)")
syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
@@ -103,6 +112,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
verbose: *verbose,
applyChanges: *applyChanges,
syncDeletions: *syncDeletions,
+ fixReadOnly: *fixReadOnly,
nonRepairThreshold: *nonRepairThreshold,
}
@@ -123,24 +133,20 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
}
- vcd.write("Pass #1 (writeable volumes)\n")
- if err := vcd.checkWriteableVolumes(volumeReplicas); err != nil {
+ if err := vcd.checkWritableVolumes(volumeReplicas); err != nil {
return err
}
- if *forceReadonly {
- vcd.write("Pass #2 (read-only volumes)\n")
- if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
- return err
- }
-
+ if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil {
+ return err
}
return nil
}
-// checkWriteableVolumes fixes volume replicas which are not read-only.
-func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
- // pick 1 pairs of volume replica
+// checkWritableVolumes fixes volume replicas which are not read-only.
+func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
+ vcd.write("Pass #1 (writable volumes)\n")
+
for _, replicas := range volumeReplicas {
// filter readonly replica
var writableReplicas []*VolumeReplica
@@ -157,16 +163,14 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
})
for len(writableReplicas) >= 2 {
a, b := writableReplicas[0], writableReplicas[1]
- if !vcd.slowMode {
- shouldSkip, err := vcd.shouldSkipVolume(a, b)
- if err != nil {
- vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
- // Continue with sync despite error to be safe
- } else if shouldSkip {
- // always choose the larger volume to be the source
- writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
- continue
- }
+ shouldSkip, err := vcd.shouldSkipVolume(a, b)
+ if err != nil {
+ vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err)
+ // Continue with sync despite error to be safe
+ } else if shouldSkip {
+ // always choose the larger volume to be the source
+ writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
+ continue
}
if err := vcd.syncTwoReplicas(a, b, true); err != nil {
vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
@@ -183,9 +187,107 @@ func (vcd *volumeCheckDisk) checkWriteableVolumes(volumeReplicas map[uint32][]*V
return nil
}
-// checkReadOnlyVolumes fixes read-only volume replicas.
+// makeVolumeWritable flags a volume as writable, by volume ID.
+func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) error {
+ if !vcd.applyChanges {
+ return nil
+ }
+
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, vsErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
+ VolumeId: vid,
+ })
+ return vsErr
+ })
+ if err != nil {
+ return err
+ }
+
+ vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id)
+ return nil
+}
+
+// makeVolumeReadOnly flags a volume as read-only, by volume ID.
+func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) error {
+ if !vcd.applyChanges {
+ return nil
+ }
+
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(vr.location.dataNode), vcd.grpcDialOption(), func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, vsErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: vid,
+ })
+ return vsErr
+ })
+ if err != nil {
+ return err
+ }
+
+ vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id)
+ return nil
+}
+
func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error {
- return fmt.Errorf("not yet implemented (https://github.com/seaweedfs/seaweedfs/issues/7442)")
+ if !vcd.fixReadOnly {
+ return nil
+ }
+ vcd.write("Pass #2 (read-only volumes)\n")
+
+ for vid, replicas := range volumeReplicas {
+ var source *VolumeReplica = nil
+ roReplicas := []*VolumeReplica{}
+
+ for _, r := range replicas {
+ if r.info.ReadOnly {
+ roReplicas = append(roReplicas, r)
+ } else {
+ // we assume all writable replicas are identical by this point, after the checkWritableVolumes() pass.
+ source = r
+ }
+ }
+ if len(roReplicas) == 0 {
+ vcd.write("no read-only replicas for volume %d\n", vid)
+ continue
+ }
+ if source == nil {
+ vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid)
+ continue
+ }
+
+ // attempt to fix read-only replicas from the know good source
+ for _, r := range roReplicas {
+ // TODO: skip full readonly volumes.
+ skip, err := vcd.shouldSkipVolume(r, source)
+ if err != nil {
+ vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err)
+ continue
+ }
+ if skip {
+ continue
+ }
+
+ // make volume writable...
+ if err := vcd.makeVolumeWritable(vid, r); err != nil {
+ return err
+ }
+
+ // ...fix it...
+ // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes.
+ if err := vcd.syncTwoReplicas(source, r, false); err != nil {
+ vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err)
+
+ // ...or revert it back to read-only, if something went wrong.
+ if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil {
+ return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr)
+ }
+ vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id)
+
+ return err
+ }
+ }
+ }
+
+ return nil
}
func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption {
@@ -260,6 +362,11 @@ func (vcd *volumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool,
// Error Handling: Errors from eqVolumeFileCount are wrapped with context and propagated.
// The Do method logs these errors and continues processing to ensure other volumes are checked.
func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) {
+ if vcd.slowMode {
+ // never skip volumes on slow mode
+ return false, nil
+ }
+
pulseTimeAtSecond := vcd.now.Add(-constants.VolumePulsePeriod * 2).Unix()
doSyncDeletedCount := false
if vcd.syncDeletions && a.info.DeleteCount != b.info.DeleteCount {