aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_check_disk.go28
1 files changed, 25 insertions, 3 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 2f3ccfdc6..4d246e26c 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -183,11 +183,34 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
aHasChanges, bHasChanges := true, true
- for aHasChanges || bHasChanges {
+ const maxIterations = 5
+ iteration := 0
+
+ for (aHasChanges || bHasChanges) && iteration < maxIterations {
+ iteration++
+ if verbose {
+ fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id)
+ }
+
+ prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges
if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
return err
}
+
+ // Detect if we're stuck in a loop with no progress
+ if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) {
+ fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
+ a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, iteration)
+ return fmt.Errorf("sync not making progress after %d iterations", iteration)
+ }
}
+
+ if iteration >= maxIterations && (aHasChanges || bHasChanges) {
+ fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
+ a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id)
+ return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
+ }
+
return nil
}
@@ -307,11 +330,10 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
for _, deleteResult := range deleteResults {
if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
hasChanges = true
- return
}
}
}
- return
+ return hasChanges, nil
}
func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (needleBlob []byte, err error) {