aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_check_disk.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_check_disk.go')
-rw-r--r--weed/shell/command_volume_check_disk.go58
1 files changed, 45 insertions, 13 deletions
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 41c28b810..6a1634f8a 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"
"io"
"math"
+ "net/http"
"time"
)
@@ -21,7 +22,8 @@ func init() {
}
type commandVolumeCheckDisk struct {
- env *CommandEnv
+ env *CommandEnv
+ syncDeletions *bool
}
func (c *commandVolumeCheckDisk) Name() string {
@@ -49,6 +51,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
verbose := fsckCommand.Bool("v", false, "verbose mode")
volumeId := fsckCommand.Uint("volumeId", 0, "the volume id")
applyChanges := fsckCommand.Bool("force", false, "apply the fix")
+ c.syncDeletions = fsckCommand.Bool("syncDeleted", false, "sync of deletions the fix")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -145,13 +148,17 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
// find missing keys
// hash join, can be more efficient
var missingNeedles []needle_map.NeedleValue
+ var partiallyDeletedNeedles []needle_map.NeedleValue
var counter int
doCutoffOfLastNeedle := true
- minuend.DescendingVisit(func(value needle_map.NeedleValue) error {
+ minuend.DescendingVisit(func(minuendValue needle_map.NeedleValue) error {
counter++
- if _, found := subtrahend.Get(value.Key); !found {
+ if subtrahendValue, found := subtrahend.Get(minuendValue.Key); !found {
+ if minuendValue.Size.IsDeleted() {
+ return nil
+ }
if doCutoffOfLastNeedle {
- if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, value); err == nil {
+ if needleMeta, err := readNeedleMeta(c.env.option.GrpcDialOption, pb.NewServerAddressFromDataNode(source.location.dataNode), source.info.Id, minuendValue); err == nil {
// needles older than the cutoff time are not missing yet
if needleMeta.AppendAtNs > cutoffFromAtNs {
return nil
@@ -159,16 +166,22 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
doCutoffOfLastNeedle = false
}
}
- missingNeedles = append(missingNeedles, value)
- } else if doCutoffOfLastNeedle {
- doCutoffOfLastNeedle = false
+ missingNeedles = append(missingNeedles, minuendValue)
+ } else {
+ if minuendValue.Size.IsDeleted() && !subtrahendValue.Size.IsDeleted() {
+ partiallyDeletedNeedles = append(partiallyDeletedNeedles, minuendValue)
+ }
+ if doCutoffOfLastNeedle {
+ doCutoffOfLastNeedle = false
+ }
}
return nil
})
- fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d entries\n", source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles))
+ fmt.Fprintf(writer, "volume %d %s has %d entries, %s missed %d and partially deleted %d entries\n",
+ source.info.Id, source.location.dataNode.Id, counter, target.location.dataNode.Id, len(missingNeedles), len(partiallyDeletedNeedles))
- if counter == 0 || len(missingNeedles) == 0 {
+ if counter == 0 || (len(missingNeedles) == 0 && len(partiallyDeletedNeedles) == 0) {
return false, nil
}
@@ -190,7 +203,7 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
}
if verbose {
- fmt.Fprintf(writer, "read %d,%x %s => %s \n", source.info.Id, needleValue.Key, source.location.dataNode.Id, target.location.dataNode.Id)
+ fmt.Fprintf(writer, "read %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
hasChanges = true
@@ -201,6 +214,27 @@ func (c *commandVolumeCheckDisk) doVolumeCheckDisk(minuend, subtrahend *needle_m
}
+ if *c.syncDeletions && len(partiallyDeletedNeedles) > 0 {
+ var fidList []string
+ for _, needleValue := range partiallyDeletedNeedles {
+ fidList = append(fidList, needleValue.Key.FileId(source.info.Id))
+ if verbose {
+ fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
+ }
+ }
+ deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(
+ pb.NewServerAddressFromDataNode(target.location.dataNode),
+ c.env.option.GrpcDialOption, fidList, false)
+ if deleteErr != nil {
+ return hasChanges, deleteErr
+ }
+ for _, deleteResult := range deleteResults {
+ if deleteResult.Status == http.StatusAccepted {
+ hasChanges = true
+ return
+ }
+ }
+ }
return
}
@@ -245,9 +279,7 @@ func (c *commandVolumeCheckDisk) readIndexDatabase(db *needle_map.MemDb, collect
if verbose {
fmt.Fprintf(writer, "load collection %s volume %d index size %d from %s ...\n", collection, volumeId, buf.Len(), volumeServer)
}
-
- return db.LoadFromReaderAt(bytes.NewReader(buf.Bytes()))
-
+ return db.LoadFilterFromReaderAt(bytes.NewReader(buf.Bytes()), true, false)
}
func (c *commandVolumeCheckDisk) copyVolumeIndexFile(collection string, volumeId uint32, volumeServer pb.ServerAddress, buf *bytes.Buffer, verbose bool, writer io.Writer) error {