diff options
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 106 |
1 files changed, 82 insertions, 24 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 7e5c031b2..ae790a418 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -7,13 +7,16 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/server/constants" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "golang.org/x/exp/slices" "google.golang.org/grpc" "io" "math" "net/http" + "sync" "time" ) @@ -22,7 +25,8 @@ func init() { } type commandVolumeCheckDisk struct { - env *CommandEnv + env *CommandEnv + writer io.Writer } func (c *commandVolumeCheckDisk) Name() string { @@ -43,6 +47,66 @@ func (c *commandVolumeCheckDisk) Help() string { ` } +func (c *commandVolumeCheckDisk) getVolumeStatusFileCount(vid uint32, dn *master_pb.DataNodeInfo) (totalFileCount, deletedFileCount uint64) { + err := operation.WithVolumeServerClient(false, pb.NewServerAddressWithGrpcPort(dn.Id, int(dn.GrpcPort)), c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, reqErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{ + VolumeId: uint32(vid), + }) + if resp != nil { + totalFileCount = resp.FileCount + deletedFileCount = resp.FileDeletedCount + } + return reqErr + }) + if err != nil { + fmt.Fprintf(c.writer, "getting number of files for volume id %d from volumes status: %+v\n", vid, err) + } + return totalFileCount, deletedFileCount +} + +func (c *commandVolumeCheckDisk) eqVolumeFileCount(a, b *VolumeReplica) (bool, bool) { + var waitGroup sync.WaitGroup + var fileCountA, fileCountB, fileDeletedCountA, fileDeletedCountB uint64 + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + fileCountA, fileDeletedCountA = c.getVolumeStatusFileCount(a.info.Id, a.location.dataNode) + }() + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + fileCountB, fileDeletedCountB = c.getVolumeStatusFileCount(b.info.Id, b.location.dataNode) + }() + // Trying to synchronize a remote call to two nodes + waitGroup.Wait() + return fileCountA == fileCountB, fileDeletedCountA == fileDeletedCountB +} + +func (c *commandVolumeCheckDisk) shouldSkipVolume(a, b *VolumeReplica, pulseTimeAtSecond int64, syncDeletions, verbose bool) bool { + doSyncDeletedCount := false + if syncDeletions && a.info.DeleteCount != b.info.DeleteCount { + doSyncDeletedCount = true + } + if (a.info.FileCount != b.info.FileCount) || doSyncDeletedCount { + // Do synchronization of volumes, if the modification time was before the last pulsation time + if a.info.ModifiedAtSecond < pulseTimeAtSecond || b.info.ModifiedAtSecond < pulseTimeAtSecond { + return false + } + if eqFileCount, eqDeletedFileCount := c.eqVolumeFileCount(a, b); eqFileCount { + if doSyncDeletedCount && !eqDeletedFileCount { + return false + } + if verbose { + fmt.Fprintf(c.writer, "skipping active volumes %d with the same file counts on %s and %s\n", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) + } + } else { + return false + } + } + return true +} + func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) @@ -62,8 +126,10 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write } c.env = commandEnv + c.writer = writer // collect topology information + pulseTimeAtSecond := time.Now().Unix() - constants.VolumePulseSeconds*2 topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) if err != nil { return err @@ -71,52 +137,44 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) // pick 1 pairs of volume replica - fileCount := func(replica *VolumeReplica) uint64 { - return replica.info.FileCount - replica.info.DeleteCount - } - for _, replicas := range volumeReplicas { if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { continue } slices.SortFunc(replicas, func(a, b *VolumeReplica) int { - return int(fileCount(b) - fileCount(a)) + return int(b.info.FileCount - a.info.FileCount) }) for len(replicas) >= 2 { a, b := replicas[0], replicas[1] - if !*slowMode { - if fileCount(a) == fileCount(b) { - replicas = replicas[1:] - continue - } - } + replicas = replicas[1:] if a.info.ReadOnly || b.info.ReadOnly { - fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) - replicas = replicas[1:] + fmt.Fprintf(writer, "skipping readonly volume %d on %s and %s\n", + a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id) continue } - - if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose, writer); err != nil { + if !*slowMode && c.shouldSkipVolume(a, b, pulseTimeAtSecond, *syncDeletions, *verbose) { + continue + } + if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose); err != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } - replicas = replicas[1:] } } return nil } -func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (err error) { +func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (err error) { aHasChanges, bHasChanges := true, true for aHasChanges || bHasChanges { - if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose, writer); err != nil { + if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil { return err } } return nil } -func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (aHasChanges bool, bHasChanges bool, err error) { +func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool) (aHasChanges bool, bHasChanges bool, err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() @@ -125,18 +183,18 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a // read index db readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil { + if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil { + if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, c.writer, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } // find and make up the differences - if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { + if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) } - if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { + if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, c.writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) } return |
