diff options
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 44 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk_test.go | 6 |
2 files changed, 28 insertions, 22 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 78b2486dd..dbb64e239 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -10,6 +10,8 @@ import ( "math" "math/rand/v2" "net/http" + "strings" + "sync" "time" "slices" @@ -32,6 +34,7 @@ type commandVolumeCheckDisk struct{} type volumeCheckDisk struct { commandEnv *CommandEnv writer io.Writer + writerMu sync.Mutex now time.Time slowMode bool @@ -149,14 +152,14 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write // checkWritableVolumes fixes volume replicas which are not read-only. func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*VolumeReplica) error { - vcd.write("Pass #1 (writable volumes)\n") + vcd.write("Pass #1 (writable volumes)") for _, replicas := range volumeReplicas { // filter readonly replica var writableReplicas []*VolumeReplica for _, replica := range replicas { if replica.info.ReadOnly { - vcd.write("skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + vcd.write("skipping readonly volume %d on %s", replica.info.Id, replica.location.dataNode.Id) } else { writableReplicas = append(writableReplicas, replica) } @@ -169,7 +172,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo a, b := writableReplicas[0], writableReplicas[1] shouldSkip, err := vcd.shouldSkipVolume(a, b) if err != nil { - vcd.write("error checking if volume %d should be skipped: %v\n", a.info.Id, err) + vcd.write("error checking if volume %d should be skipped: %v", a.info.Id, err) // Continue with sync despite error to be safe } else if shouldSkip { // always choose the larger volume to be the source @@ -177,7 +180,7 @@ func (vcd *volumeCheckDisk) checkWritableVolumes(volumeReplicas map[uint32][]*Vo continue } if err := vcd.syncTwoReplicas(a, b, true); err != nil { - vcd.write("sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) + vcd.write("sync volume %d on %s and %s: %v", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } // always choose the larger volume to be the source if a.info.FileCount > b.info.FileCount { @@ -207,7 +210,7 @@ func (vcd *volumeCheckDisk) makeVolumeWritable(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now writable\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now writable", vid, vr.location.dataNode.Id) return nil } @@ -227,7 +230,7 @@ func (vcd *volumeCheckDisk) makeVolumeReadonly(vid uint32, vr *VolumeReplica) er return err } - vcd.write("volume %d on %s is now read-only\n", vid, vr.location.dataNode.Id) + vcd.write("volume %d on %s is now read-only", vid, vr.location.dataNode.Id) return nil } @@ -235,7 +238,7 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo if !vcd.fixReadOnly { return } - vcd.write("Pass #2 (read-only volumes)\n") + vcd.write("Pass #2 (read-only volumes)") for vid, replicas := range volumeReplicas { roReplicas := []*VolumeReplica{} @@ -249,11 +252,11 @@ func (vcd *volumeCheckDisk) checkReadOnlyVolumes(volumeReplicas map[uint32][]*Vo } } if len(roReplicas) == 0 { - vcd.write("no read-only replicas for volume %d\n", vid) + vcd.write("no read-only replicas for volume %d", vid) continue } if len(rwReplicas) == 0 { - vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from\n", len(roReplicas), vid) + vcd.write("got %d read-only replicas for volume %d and no writable replicas to fix from", len(roReplicas), vid) continue } @@ -303,12 +306,15 @@ func (vcd *volumeCheckDisk) grpcDialOption() grpc.DialOption { } func (vcd *volumeCheckDisk) write(format string, a ...any) { - fmt.Fprintf(vcd.writer, format, a...) + vcd.writerMu.Lock() + defer vcd.writerMu.Unlock() + fmt.Fprintf(vcd.writer, strings.TrimRight(format, "\r\n "), a...) + fmt.Fprint(vcd.writer, "\n") } func (vcd *volumeCheckDisk) writeVerbose(format string, a ...any) { if vcd.verbose { - fmt.Fprintf(vcd.writer, format, a...) + vcd.write(format, a...) } } @@ -394,7 +400,7 @@ func (vcd *volumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica) (bool, error) if doSyncDeletedCount && !eqDeletedFileCount { return false, nil } - vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s\n", + vcd.writeVerbose("skipping active volumes %d with the same file counts on %s and %s", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) } else { return false, nil @@ -412,7 +418,7 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi for (sourceHasChanges || targetHasChanges) && iteration < maxIterations { iteration++ - vcd.writeVerbose("sync iteration %d/%d for volume %d\n", iteration, maxIterations, source.info.Id) + vcd.writeVerbose("sync iteration %d/%d for volume %d", iteration, maxIterations, source.info.Id) prevSourceHasChanges, prevTargetHasChanges := sourceHasChanges, targetHasChanges if sourceHasChanges, targetHasChanges, err = vcd.checkBoth(source, target, bidi); err != nil { @@ -421,14 +427,14 @@ func (vcd *volumeCheckDisk) syncTwoReplicas(source, target *VolumeReplica, bidi // Detect if we're stuck in a loop with no progress if iteration > 1 && prevSourceHasChanges == sourceHasChanges && prevTargetHasChanges == targetHasChanges && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n", + vcd.write("volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop", source.info.Id, source.location.dataNode.Id, target.location.dataNode.Id, iteration) return fmt.Errorf("sync not making progress after %d iterations", iteration) } } if iteration >= maxIterations && (sourceHasChanges || targetHasChanges) { - vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n", + vcd.write("volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention", source.info.Id, maxIterations, source.location.dataNode.Id, target.location.dataNode.Id) return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations) } @@ -518,7 +524,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me return nil }) - vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n", + vcd.write("volume %d %s has %d entries, %s missed %d and partially deleted %d entries", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles)) if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) { @@ -542,7 +548,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me continue } - vcd.writeVerbose("read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("read %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) hasChanges = true if err = vcd.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { @@ -555,7 +561,7 @@ func (vcd *volumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.Me var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) - vcd.writeVerbose("delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) + vcd.writeVerbose("delete %s %s => %s", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } deleteResults := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), @@ -610,7 +616,7 @@ func (vcd *volumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection s return err } - vcd.writeVerbose("load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer) + vcd.writeVerbose("load collection %s volume %d index size %d from %s ...", collection, volumeId, buf.Len(), volumeServer) return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } diff --git a/weed/shell/command_volume_check_disk_test.go b/weed/shell/command_volume_check_disk_test.go index eee9103a8..ec958fbc4 100644 --- a/weed/shell/command_volume_check_disk_test.go +++ b/weed/shell/command_volume_check_disk_test.go @@ -278,14 +278,14 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) { } // Test write method - vcd.write("test %s\n", "message") + vcd.write("test %s", "message") if buf.String() != "test message\n" { t.Errorf("write() output = %q, want %q", buf.String(), "test message\n") } // Test writeVerbose with verbose=true buf.Reset() - vcd.writeVerbose("verbose %d\n", 123) + vcd.writeVerbose("verbose %d", 123) if buf.String() != "verbose 123\n" { t.Errorf("writeVerbose() with verbose=true output = %q, want %q", buf.String(), "verbose 123\n") } @@ -293,7 +293,7 @@ func TestVolumeCheckDiskHelperMethods(t *testing.T) { // Test writeVerbose with verbose=false buf.Reset() vcd.verbose = false - vcd.writeVerbose("should not appear\n") + vcd.writeVerbose("should not appear") if buf.String() != "" { t.Errorf("writeVerbose() with verbose=false output = %q, want empty", buf.String()) } |
