diff options
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 45 |
1 files changed, 22 insertions, 23 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 43a4b26ca..7e5c031b2 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -22,8 +22,7 @@ func init() { } type commandVolumeCheckDisk struct { - env *CommandEnv - syncDeletions *bool + env *CommandEnv } func (c *commandVolumeCheckDisk) Name() string { @@ -51,7 +50,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write verbose := fsckCommand.Bool("v", false, "verbose mode") volumeId := fsckCommand.Uint("volumeId", 0, "the volume id") applyChanges := fsckCommand.Bool("force", false, "apply the fix") - c.syncDeletions = fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") + syncDeletions := fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { return nil @@ -97,7 +96,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write continue } - if err := c.syncTwoReplicas(a, b, *applyChanges, *nonRepairThreshold, *verbose, writer); err != nil { + if err := c.syncTwoReplicas(a, b, *applyChanges, *syncDeletions, *nonRepairThreshold, *verbose, writer); err != nil { fmt.Fprintf(writer, "sync volume %d on %s and %s: %v\n", a.info.Id, a.location.dataNode.Id, b.location.dataNode.Id, err) } replicas = replicas[1:] @@ -107,17 +106,17 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write return nil } -func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (err error) { +func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (err error) { aHasChanges, bHasChanges := true, true for aHasChanges || bHasChanges { - if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, nonRepairThreshold, verbose, writer); err != nil { + if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose, writer); err != nil { return err } } return nil } -func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (aHasChanges bool, bHasChanges bool, err error) { +func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, verbose bool, writer io.Writer) (aHasChanges bool, bHasChanges bool, err error) { aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() defer func() { aDB.Close() @@ -126,24 +125,24 @@ func (c *commandVolumeCheckDisk) checkBoth(a *VolumeReplica, b *VolumeReplica, a // read index db readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) - if err = c.readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, writer); err != nil { + if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) } - if err := c.readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, writer); err != nil { + if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), verbose, writer, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) } // find and make up the differences - if aHasChanges, err = c.doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, nonRepairThreshold, readIndexDbCutoffFrom); err != nil { + if aHasChanges, err = doVolumeCheckDisk(bDB, aDB, b, a, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", b.location.dataNode.Id, a.location.dataNode.Id, b.info.Id, err) } - if bHasChanges, err = c.doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, nonRepairThreshold, readIndexDbCutoffFrom); err != nil { + if bHasChanges, err = doVolumeCheckDisk(aDB, bDB, a, b, verbose, writer, applyChanges, doSyncDeletions, nonRepairThreshold, readIndexDbCutoffFrom, c.env.option.GrpcDialOption); err != nil { return true, true, fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) } return } -func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, nonRepairThreshold float64, cutoffFromAtNs uint64) (hasChanges bool, err error) { +func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *VolumeReplica, verbose bool, writer io.Writer, applyChanges bool, doSyncDeletions bool, nonRepairThreshold float64, cutoffFromAtNs uint64, grpcDialOption grpc.DialOption) (hasChanges bool, err error) { // find missing keys // hash join, can be more efficient @@ -158,7 +157,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m return nil } if doCutoffOfLastNeedle { - if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { + if needleMeta, err := readNeedleMeta(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil { // needles older than the cutoff time are not missing yet if needleMeta.AppendAtNs > cutoffFromAtNs { return nil @@ -193,7 +192,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m } for _, needleValue := range missingNeedles { - needleBlob, err := readSourceNeedleBlob(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) + needleBlob, err := readSourceNeedleBlob(grpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, needleValue) if err != nil { return hasChanges, err } @@ -208,13 +207,13 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m hasChanges = true - if err = c.writeNeedleBlobToTarget(pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { + if err = writeNeedleBlobToTarget(grpcDialOption, pb.NewServerAddressFromDataNode(target.location.dataNode), source.info.Id, needleValue, needleBlob); err != nil { return hasChanges, err } } - if *c.syncDeletions && len(partiallyDeletedNeedles) > 0 { + if doSyncDeletions && len(partiallyDeletedNeedles) > 0 { var fidList []string for _, needleValue := range partiallyDeletedNeedles { fidList = append(fidList, needleValue.Key.FileId(source.info.Id)) @@ -224,7 +223,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m } deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), - c.env.option.GrpcDialOption, fidList, false) + grpcDialOption, fidList, false) if deleteErr != nil { return hasChanges, deleteErr } @@ -255,9 +254,9 @@ func readSourceNeedleBlob(grpcDialOption grpc.DialOption, sourceVolumeServer pb. return } -func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { +func writeNeedleBlobToTarget(grpcDialOption grpc.DialOption, targetVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue, needleBlob []byte) error { - return operation.WithVolumeServerClient(false, targetVolumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(false, targetVolumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.WriteNeedleBlob(context.Background(), &volume_server_pb.WriteNeedleBlobRequest{ VolumeId: volumeId, NeedleId: uint64(needleValue.Key), @@ -269,10 +268,10 @@ func (c *commandVolumeCheckDisk) writeNeedleBlobToTarget(targetVolumeServer pb.S } -func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer) error { +func readIndexDatabase(db *needle_map.MemDb, collection string, volumeId uint32, volumeServer pb.ServerAddress, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { var buf bytes.Buffer - if err := c.copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer); err != nil { + if err := copyVolumeIndexFile(collection, volumeId, volumeServer, &buf, verbose, writer, grpcDialOption); err != nil { return err } @@ -282,9 +281,9 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false) } -func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error { +func copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer, grpcDialOption grpc.DialOption) error { - return operation.WithVolumeServerClient(true, volumeServer, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + return operation.WithVolumeServerClient(true, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ext := ".idx" |
