diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-04 10:44:31 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-04 10:44:31 -0800 |
| commit | 66e2d9bca1397489309e0754f7c059c398934012 (patch) | |
| tree | 0e326035a43aa360c894aaeec2b9bddcb3aa63ce /weed/shell/command_volume_check_disk.go | |
| parent | 49ed42b367914ac4f3e2853e698e8fc05ddac24e (diff) | |
| parent | 8d110b29ddfd9b9cdb504a4380106b2b287155ca (diff) | |
| download | seaweedfs-origin/feature/tus-protocol.tar.xz seaweedfs-origin/feature/tus-protocol.zip | |
Merge branch 'master' into feature/tus-protocolorigin/feature/tus-protocol
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 131 |
1 files changed, 80 insertions, 51 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index d7b015979..4d775000f 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -10,6 +10,8 @@ import ( "math" "math/rand/v2" "net/http" + "strings" + "sync" "time" "slices" @@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{} type volumeCheckDisk struct { commandEnv *CommandEnv writer io.Writer + writerMu sync.Mutex now time.Time slowMode bool @@ -40,6 +43,8 @@ type volumeCheckDisk struct { syncDeletions bool fixReadOnly bool nonRepairThreshold float64 + + ewg *ErrorWaitGroup } func (c *commandVolumeCheckDisk) Name() string { @@ -59,9 +64,9 @@ func (c *commandVolumeCheckDisk) Help() string { append entries in B and not in A to A optionally, for each non-writable volume replica A - if volume is not full + select a writable volume replica B + if entries in A don't match B prune late volume entries not matching its index file - select a writable volume replica B append missing entries from B into A mark the volume as writable (healthy) @@ -92,6 +97,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write applyChangesAlias := fsckCommand.Bool("force", false, "apply the fix (alias for -apply)") fixReadOnly := fsckCommand.Bool("fixReadOnly", false, "apply the fix even on readonly volumes (EXPERIMENTAL!)") syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") + maxParallelization := fsckCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -115,6 +121,8 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write syncDeletions: *syncDeletions, fixReadOnly: *fixReadOnly, nonRepairThreshold: *nonRepairThreshold, + + ewg: NewErrorWaitGroup(*maxParallelization), } // collect topology information @@ -137,23 +145,21 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if err := vcd.checkWritableVolumes(volumeReplicas); err != nil { return err } - if err := vcd.checkReadOnlyVolumes(volumeReplicas); err != nil { - return err - } + vcd.checkReadOnlyVolumes(volumeReplicas) - return nil + return vcd.ewg.Wait() } // checkWritableVolumes fixes volume replicas which are not read-only. func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - vcd.write("Pass #1 (writable volumes)\n") + vcd.write("Pass #1 (writable volumes)") for _, replicas := range volumeReplicas { // filter readonly replica var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -166,16 +172,23 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo a, b := writableReplicas[0], writableReplicas[1] shouldSkip, err := vcd.shouldSkipVolume(a, b) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + vcd.write("error checking if volume %d should be skipped: %v", a.info.Id, err) // Continue with sync despite error to be safe } else if shouldSkip { // always choose the larger volume to be the source writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) continue } - if err := vcd.syncTwoReplicas(a, b, true); err != nil { - vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + + modified, err := vcd.syncTwoReplicas(a, b, true) + if err != nil { + vcd.write("failed to sync volumes %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + } else { + if modified { + vcd.write("synced %s and %s for volume %d", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id) + } } + // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) @@ -204,7 +217,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now writable", vid, vr.location.dataNode.Id) return nil } @@ -224,15 +237,15 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now read-only", vid, vr.location.dataNode.Id) return nil } -func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { +func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*VolumeReplica) { if !vcd.fixReadOnly { - return nil + return } - vcd.write("Pass #2 (read-only volumes)\n") + vcd.write("Pass #2 (read-only volumes)") for vid, replicas := range volumeReplicas { roReplicas := []*VolumeReplica{} @@ -246,11 +259,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo } } if len(roReplicas) == 0 { - vcd.write("no read-only replicas for volume %d\n", vid) + vcd.write("no read-only replicas for volume %d", vid) continue } if len(rwReplicas) == 0 { - vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid) + vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from", len(roReplicas), vid) continue } @@ -261,35 +274,44 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo skip, err := vcd.shouldSkipVolume(r, source) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", r.info.Id, err) + vcd.ewg.AddErrorf("failed to check if volume %d should be skipped: %v\n", r.info.Id, err) continue } if skip { continue } - // make volume writable... - if err := vcd.makeVolumeWritable(vid, r); err != nil { - return err - } + vcd.ewg.Add(func() error { + // make volume writable... + if err := vcd.makeVolumeWritable(vid, r); err != nil { + return err + } - // ...fix it... - // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes. - if err := vcd.syncTwoReplicas(source, r, false); err != nil { - vcd.write("sync read-only volume %d on %s from %s: %v\n", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) + // ...try to fix it... + // TODO: test whether syncTwoReplicas() is enough to prune garbage entries on broken volumes... + modified, err := vcd.syncTwoReplicas(source, r, false) + if err != nil { + vcd.write("sync read-only volume %d on %s from %s: %v", vid, r.location.dataNode.Id, source.location.dataNode.Id, err) - // ...or revert it back to read-only, if something went wrong. - if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { - return fmt.Errorf("failed to make volume %d on %s readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + if roErr := vcd.makeVolumeReadonly(vid, r); roErr != nil { + return fmt.Errorf("failed to revert volume %d on %s to readonly after: %v: %v", vid, r.location.dataNode.Id, err, roErr) + } + return err + } else { + if modified { + vcd.write("volume %d on %s is now synced to %d and writable", vid, r.location.dataNode.Id, source.location.dataNode.Id) + } else { + // ...or restore back to read-only, if no changes were made. + if err := vcd.makeVolumeReadonly(vid, r); err != nil { + return fmt.Errorf("failed to revert volume %d on %s to readonly: %v", vid, r.location.dataNode.Id, err) + } + } } - vcd.write("volume %d on %s is now read-only\n", vid, r.location.dataNode.Id) - return err - } + return nil + }) } } - - return nil } func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { @@ -297,12 +319,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { } func (vcd *volumeCheckDisk) write(format string, a ...any) { - fmt.Fprintf(vcd.writer, format, a...) + vcd.writerMu.Lock() + defer vcd.writerMu.Unlock() + fmt.Fprintf(vcd.writer, strings.TrimRight(format, "\r\n "), a...) + fmt.Fprint(vcd.writer, "\n") } func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { if vcd.verbose { - fmt.Fprintf(vcd.writer, format, a...) + vcd.write(format, a...) } } @@ -388,7 +413,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) if doSyncDeletedCount && !eqDeletedFileCount { return false, nil } - vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) } else { return false, nil @@ -399,35 +424,39 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) // syncTwoReplicas attempts to sync all entries from a source volume replica into a target. If bi-directional mode // is enabled, changes from target are also synced back into the source. -func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (err error) { +// Returns true if source and/or target were modified, false otherwise. +func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi bool) (modified bool, err error) { sourceHasChanges, targetHasChanges := true, true const maxIterations = 5 iteration := 0 + modified = false + for (sourceHasChanges || targetHasChanges) && iteration < maxIterations { iteration++ - vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) + vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id) prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { - return err + return modified, err } + modified = modified || sourceHasChanges || targetHasChanges // Detect if we're stuck in a loop with no progress if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop", source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) - return fmt.Errorf("sync not making progress after %d iterations", iteration) + return modified, fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention", source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id) - return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) + return modified, fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } - return nil + return modified, nil } // checkBoth performs a sync between source and target volume replicas. If bi-directional mode is enabled, changes from target are also synced back into the source. @@ -512,7 +541,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me return nil }) - vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -536,7 +565,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me continue } - vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("read %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { @@ -549,7 +578,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("delete %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), @@ -604,7 +633,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s return err } - vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } @@ -616,7 +645,7 @@ func (vcd *volumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: volumeId, - Ext: ".idx", + Ext: ext, CompactionRevision: math.MaxUint32, StopOffset: math.MaxInt64, Collection: collection, |
