aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_check_disk.go53
1 files changed, 34 insertions, 19 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 0e76f6ac9..669bbaf3b 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -5,6 +5,12 @@ import (
"context"
"flag"
"fmt"
+ "io"
+ "math"
+ "net/http"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -13,11 +19,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
- "io"
- "math"
- "net/http"
- "sync"
- "time"
)
func init() {
@@ -141,23 +142,35 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
continue
}
- slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
+ // filter readonly replica
+ var writableReplicas []*VolumeReplica
+ for _, replica := range replicas {
+ if replica.info.ReadOnly {
+ fmt.Fprintf(writer, "skipping readonly volume %d on %s\n", replica.info.Id, replica.location.dataNode.Id)
+ } else {
+ writableReplicas = append(writableReplicas, replica)
+ }
+ }
+
+ slices.SortFunc(writableReplicas, func(a, b *VolumeReplica) int {
return int(b.info.FileCount - a.info.FileCount)
})
- for len(replicas) >= 2 {
- a, b := replicas[0], replicas[1]
- replicas = replicas[1:]
- if a.info.ReadOnly || b.info.ReadOnly {
- fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n",
- a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
- continue
- }
+ for len(writableReplicas) >= 2 {
+ a, b := writableReplicas[0], writableReplicas[1]
if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
+ // always choose the larger volume to be the source
+ writableReplicas = append(replicas[:1], writableReplicas[2:]...)
continue
}
if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
+ // always choose the larger volume to be the source
+ if a.info.FileCount > b.info.FileCount {
+ writableReplicas = append(writableReplicas[:1], writableReplicas[2:]...)
+ } else {
+ writableReplicas = writableReplicas[1:]
+ }
}
}
@@ -191,13 +204,15 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
}
// find and make up the differences
- if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
+ aHasChanges, err1 := doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
+ bHasChanges, err2 := doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption)
+ if err1 != nil {
+ return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err1)
}
- if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
- return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
+ if err2 != nil {
+ return aHasChanges, bHasChanges, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err2)
}
- return
+ return aHasChanges, bHasChanges, nil
}
func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) {