diff options
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 57 |
1 files changed, 38 insertions, 19 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 0e76f6ac9..f72dff243 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -5,6 +5,12 @@ import ( "context" "flag" "fmt" + "io" + "math" + "net/http" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -13,11 +19,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "golang.org/x/exp/slices" "google.golang.org/grpc" - "io" - "math" - "net/http" - "sync" - "time" ) func init() { @@ -47,6 +48,10 @@ func (c *commandVolumeCheckDisk) Help() string { ` } +func (c *commandVolumeCheckDisk) HasTag(tag CommandTag) bool { + return tag == ResourceHeavy +} + func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ @@ -141,23 +146,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { continue } - slices.SortFunc(replicas, func(a, b *VolumeReplica) int { + // filter readonly replica + var writableReplicas []*VolumeReplica + for _, replica := range replicas { + if replica.info.ReadOnly { + fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id) + } else { + writableReplicas = append(writableReplicas, replica) + } + } + + slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int { return int(b.info.FileCount - a.info.FileCount) }) - for len(replicas) >= 2 { - a, b := replicas[0], replicas[1] - replicas = replicas[1:] - if a.info.ReadOnly || b.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", - a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - continue - } + for len(writableReplicas) >= 2 { + a, b := writableReplicas[0], writableReplicas[1] if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { + // always choose the larger volume to be the source + writableReplicas = append(replicas[:1], writableReplicas[2:]...) continue } if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } + // always choose the larger volume to be the source + if a.info.FileCount > b.info.FileCount { + writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...) + } else { + writableReplicas = writableReplicas[1:] + } } } @@ -191,13 +208,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a } // find and make up the differences - if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) + aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption) + if err1 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1) } - if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { - return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) + if err2 != nil { + return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2) } - return + return aHasChanges, bHasChanges, nil } func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { |
