aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_check_disk.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
-rw-r--r--weed/shell/command_volume_check_disk.go106
1 files changed, 82 insertions, 24 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 7e5c031b2..ae790a418 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -7,13 +7,16 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/server/constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"io"
"math"
"net/http"
+ "sync"
"time"
)
@@ -22,7 +25,8 @@ func init() {
}
type commandVolumeCheckDisk struct {
- env *CommandEnv
+ env *CommandEnv
+ writer io.Writer
}
func (c *commandVolumeCheckDisk) Name() string {
@@ -43,6 +47,66 @@ func (c *commandVolumeCheckDisk) Help() string {
`
}
+func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) {
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
+ VolumeId: uint32(vid),
+ })
+ if resp != nil {
+ totalFileCount = resp.FileCount
+ deletedFileCount = resp.FileDeletedCount
+ }
+ return reqErr
+ })
+ if err != nil {
+ fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err)
+ }
+ return totalFileCount, deletedFileCount
+}
+
+func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) {
+ var waitGroup sync.WaitGroup
+ var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64
+ waitGroup.Add(1)
+ go func() {
+ defer waitGroup.Done()
+ fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode)
+ }()
+ waitGroup.Add(1)
+ go func() {
+ defer waitGroup.Done()
+ fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode)
+ }()
+ // Trying to synchronize a remote call to two nodes
+ waitGroup.Wait()
+ return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB
+}
+
+func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTimeAtSecond int64, syncDeletions, verbose bool) bool {
+ doSyncDeletedCount := false
+ if syncDeletions && a.info.DeleteCount != b.info.DeleteCount {
+ doSyncDeletedCount = true
+ }
+ if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount {
+ // Do synchronization of volumes, if the modification time was before the last pulsation time
+ if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond {
+ return false
+ }
+ if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount {
+ if doSyncDeletedCount && !eqDeletedFileCount {
+ return false
+ }
+ if verbose {
+ fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n",
+ a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
+ }
+ } else {
+ return false
+ }
+ }
+ return true
+}
+
func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
@@ -62,8 +126,10 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
c.env = commandEnv
+ c.writer = writer
// collect topology information
+ pulseTimeAtSecond := time.Now().Unix() - constants.VolumePulseSeconds*2
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
@@ -71,52 +137,44 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
// pick 1 pairs of volume replica
- fileCount := func(replica *VolumeReplica) uint64 {
- return replica.info.FileCount - replica.info.DeleteCount
- }
-
for _, replicas := range volumeReplicas {
if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
continue
}
slices.SortFunc(replicas, func(a, b *VolumeReplica) int {
- return int(fileCount(b) - fileCount(a))
+ return int(b.info.FileCount - a.info.FileCount)
})
for len(replicas) >= 2 {
a, b := replicas[0], replicas[1]
- if !*slowMode {
- if fileCount(a) == fileCount(b) {
- replicas = replicas[1:]
- continue
- }
- }
+ replicas = replicas[1:]
if a.info.ReadOnly || b.info.ReadOnly {
- fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
- replicas = replicas[1:]
+ fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n",
+ a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id)
continue
}
-
- if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose, writer); err != nil {
+ if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) {
+ continue
+ }
+ if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil {
fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err)
}
- replicas = replicas[1:]
}
}
return nil
}
-func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (err error) {
+func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) {
aHasChanges, bHasChanges := true, true
for aHasChanges || bHasChanges {
- if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose, writer); err != nil {
+ if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
return err
}
}
return nil
}
-func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (aHasChanges bool, bHasChanges bool, err error) {
+func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) {
aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb()
defer func() {
aDB.Close()
@@ -125,18 +183,18 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a
// read index db
readIndexDbCutoffFrom := uint64(time.Now().UnixNano())
- if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil {
+ if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err)
}
- if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil {
+ if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err)
}
// find and make up the differences
- if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
+ if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err)
}
- if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
+ if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil {
return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err)
}
return