aboutsummaryrefslogtreecommitdiff
path: root/weed/command/fix.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/fix.go')
-rw-r--r--weed/command/fix.go58
1 files changed, 46 insertions, 12 deletions
diff --git a/weed/command/fix.go b/weed/command/fix.go
index b226a0b1a..4fb4ed88e 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -32,12 +32,15 @@ var cmdFix = &Command{
var (
fixVolumeCollection = cmdFix.Flag.String("collection", "", "an optional volume collection name, if specified only it will be processed")
fixVolumeId = cmdFix.Flag.Int64("volumeId", 0, "an optional volume id, if not 0 (default) only it will be processed")
+ fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", true, "include deleted entries in the index file")
fixIgnoreError = cmdFix.Flag.Bool("ignoreError", false, "an optional, if true will be processed despite errors")
)
type VolumeFileScanner4Fix struct {
- version needle.Version
- nm *needle_map.MemDb
+ version needle.Version
+ nm *needle_map.MemDb
+ nmDeleted *needle_map.MemDb
+ includeDeleted bool
}
func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
@@ -50,13 +53,20 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
}
func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
- glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
+ glog.V(2).Infof("key %v offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
if n.Size.IsValid() {
- pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
- glog.V(2).Infof("saved %d with error %v", n.Size, pe)
+ if pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size); pe != nil {
+ return fmt.Errorf("saved %d with error %v", n.Size, pe)
+ }
} else {
- glog.V(2).Infof("skipping deleted file ...")
- return scanner.nm.Delete(n.Id)
+ if scanner.includeDeleted {
+ if pe := scanner.nmDeleted.Set(n.Id, types.ToOffset(offset), types.TombstoneFileSize); pe != nil {
+ return fmt.Errorf("saved deleted %d with error %v", n.Size, pe)
+ }
+ } else {
+ glog.V(2).Infof("skipping deleted file ...")
+ return scanner.nm.Delete(n.Id)
+ }
}
return nil
}
@@ -109,21 +119,45 @@ func runFix(cmd *Command, args []string) bool {
if *fixVolumeId != 0 && *fixVolumeId != volumeId {
continue
}
- doFixOneVolume(basePath, baseFileName, collection, volumeId)
+ doFixOneVolume(basePath, baseFileName, collection, volumeId, *fixIncludeDeleted)
}
}
return true
}
-func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64) {
+func SaveToIdx(scaner *VolumeFileScanner4Fix, idxName string) (ret error) {
+ idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return
+ }
+ defer func() {
+ idxFile.Close()
+ }()
+
+ return scaner.nm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ _, err := idxFile.Write(value.ToBytes())
+ if scaner.includeDeleted && err == nil {
+ if deleted, ok := scaner.nmDeleted.Get(value.Key); ok {
+ _, err = idxFile.Write(deleted.ToBytes())
+ }
+ }
+ return err
+ })
+}
+
+func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64, fixIncludeDeleted bool) {
indexFileName := path.Join(basepath, baseFileName+".idx")
nm := needle_map.NewMemDb()
+ nmDeleted := needle_map.NewMemDb()
defer nm.Close()
+ defer nmDeleted.Close()
vid := needle.VolumeId(volumeId)
scanner := &VolumeFileScanner4Fix{
- nm: nm,
+ nm: nm,
+ nmDeleted: nmDeleted,
+ includeDeleted: fixIncludeDeleted,
}
if err := storage.ScanVolumeFile(basepath, collection, vid, storage.NeedleMapInMemory, scanner); err != nil {
@@ -135,12 +169,12 @@ func doFixOneVolume(basepath string, baseFileName string, collection string, vol
}
}
- if err := nm.SaveToIdx(indexFileName); err != nil {
- os.Remove(indexFileName)
+ if err := SaveToIdx(scanner, indexFileName); err != nil {
err := fmt.Errorf("save to .idx File: %v", err)
if *fixIgnoreError {
glog.Error(err)
} else {
+ os.Remove(indexFileName)
glog.Fatal(err)
}
}