diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2024-05-12 23:31:34 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-05-12 11:31:34 -0700 |
| commit | d389c5b27e6ceef6fb623c0c2b8405c754d3ac5d (patch) | |
| tree | 4a5a7272e3a652468e442f5306744ec27cd043dc /weed/command | |
| parent | 731b3aadbeacc7754f5b91ce1b3f9b96eb428f3f (diff) | |
| download | seaweedfs-d389c5b27e6ceef6fb623c0c2b8405c754d3ac5d.tar.xz seaweedfs-d389c5b27e6ceef6fb623c0c2b8405c754d3ac5d.zip | |
fix: recreate index include deleted files (#5579)
* fix: recreate index include deleted files
https://github.com/seaweedfs/seaweedfs/issues/5508
* fix: counting the number of files
* fix: log
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/fix.go | 58 |
1 files changed, 46 insertions, 12 deletions
diff --git a/weed/command/fix.go b/weed/command/fix.go index b226a0b1a..4fb4ed88e 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -32,12 +32,15 @@ var cmdFix = &Command{ var ( fixVolumeCollection = cmdFix.Flag.String("collection", "", "an optional volume collection name, if specified only it will be processed") fixVolumeId = cmdFix.Flag.Int64("volumeId", 0, "an optional volume id, if not 0 (default) only it will be processed") + fixIncludeDeleted = cmdFix.Flag.Bool("includeDeleted", true, "include deleted entries in the index file") fixIgnoreError = cmdFix.Flag.Bool("ignoreError", false, "an optional, if true will be processed despite errors") ) type VolumeFileScanner4Fix struct { - version needle.Version - nm *needle_map.MemDb + version needle.Version + nm *needle_map.MemDb + nmDeleted *needle_map.MemDb + includeDeleted bool } func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -50,13 +53,20 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { } func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { - glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed()) + glog.V(2).Infof("key %v offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed()) if n.Size.IsValid() { - pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size) - glog.V(2).Infof("saved %d with error %v", n.Size, pe) + if pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size); pe != nil { + return fmt.Errorf("saved %d with error %v", n.Size, pe) + } } else { - glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id) + if scanner.includeDeleted { + if pe := scanner.nmDeleted.Set(n.Id, types.ToOffset(offset), types.TombstoneFileSize); pe != nil { + return fmt.Errorf("saved deleted %d with error %v", n.Size, pe) + } + } else { + glog.V(2).Infof("skipping deleted file ...") + return scanner.nm.Delete(n.Id) + } } return nil } @@ -109,21 +119,45 @@ func runFix(cmd *Command, args []string) bool { if *fixVolumeId != 0 && *fixVolumeId != volumeId { continue } - doFixOneVolume(basePath, baseFileName, collection, volumeId) + doFixOneVolume(basePath, baseFileName, collection, volumeId, *fixIncludeDeleted) } } return true } -func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64) { +func SaveToIdx(scaner *VolumeFileScanner4Fix, idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return + } + defer func() { + idxFile.Close() + }() + + return scaner.nm.AscendingVisit(func(value needle_map.NeedleValue) error { + _, err := idxFile.Write(value.ToBytes()) + if scaner.includeDeleted && err == nil { + if deleted, ok := scaner.nmDeleted.Get(value.Key); ok { + _, err = idxFile.Write(deleted.ToBytes()) + } + } + return err + }) +} + +func doFixOneVolume(basepath string, baseFileName string, collection string, volumeId int64, fixIncludeDeleted bool) { indexFileName := path.Join(basepath, baseFileName+".idx") nm := needle_map.NewMemDb() + nmDeleted := needle_map.NewMemDb() defer nm.Close() + defer nmDeleted.Close() vid := needle.VolumeId(volumeId) scanner := &VolumeFileScanner4Fix{ - nm: nm, + nm: nm, + nmDeleted: nmDeleted, + includeDeleted: fixIncludeDeleted, } if err := storage.ScanVolumeFile(basepath, collection, vid, storage.NeedleMapInMemory, scanner); err != nil { @@ -135,12 +169,12 @@ func doFixOneVolume(basepath string, baseFileName string, collection string, vol } } - if err := nm.SaveToIdx(indexFileName); err != nil { - os.Remove(indexFileName) + if err := SaveToIdx(scanner, indexFileName); err != nil { err := fmt.Errorf("save to .idx File: %v", err) if *fixIgnoreError { glog.Error(err) } else { + os.Remove(indexFileName) glog.Fatal(err) } } |
