aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/shell/command_volume_fsck.go163
1 files changed, 154 insertions, 9 deletions
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index d6c9e796f..400e96fe7 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -1,6 +1,7 @@
package shell
import (
+ "bufio"
"context"
"flag"
"fmt"
@@ -44,6 +45,12 @@ func (c *commandVolumeFsck) Help() string {
2. collect all file ids from the filer, as set B
3. find out the set A subtract B
+ If -findMissingChunksInFiler is enabled, this works
+ in a reverse way:
+ 1. collect all file ids from all volumes, as set A
+ 2. collect all file ids from the filer, as set B
+ 3. find out the set B subtract A
+
`
}
@@ -55,6 +62,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := fsckCommand.Bool("v", false, "verbose mode")
+ findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see help volume.fsck")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -86,21 +94,105 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
}
}
- // collect all filer file ids
- if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
- return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ if *findMissingChunksInFiler {
+ // collect all filer file ids and paths
+ if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
+ }
+ // for each volume, check filer file ids
+ if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
+ }
+ } else {
+ // collect all filer file ids
+ if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
+ return fmt.Errorf("failed to collect file ids from filer: %v", err)
+ }
+ // volume file ids substract filer file ids
+ if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
+ }
}
- // volume file ids substract filer file ids
- err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, verbose, applyPurging)
+ return nil
+}
+
+func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
+
+ if verbose {
+ fmt.Fprintf(writer, "checking each file from filer ...\n")
+ }
+
+ files := make(map[uint32]*os.File)
+ for vid := range volumeIdToServer {
+ dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
+ }
+ files[vid] = dst
+ }
+ defer func() {
+ for _, f := range files {
+ f.Close()
+ }
+ }()
- return err
+ type Item struct {
+ vid uint32
+ fileKey uint64
+ cookie uint32
+ path util.FullPath
+ }
+ return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
+ buffer := make([]byte, 8)
+ for item := range outputChan {
+ i := item.(*Item)
+ if f, ok := files[i.vid]; ok {
+ util.Uint64toBytes(buffer, i.fileKey)
+ f.Write(buffer)
+ util.Uint32toBytes(buffer, i.cookie)
+ util.Uint32toBytes(buffer[4:], uint32(len(i.path)))
+ f.Write(buffer)
+ f.Write([]byte(i.path))
+ } else {
+ fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
+ }
+ }
+ }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
+ if resolveErr != nil {
+ return nil
+ }
+ dChunks = append(dChunks, mChunks...)
+ for _, chunk := range dChunks {
+ outputChan <- &Item{
+ vid: chunk.Fid.VolumeId,
+ fileKey: chunk.Fid.FileKey,
+ cookie: chunk.Fid.Cookie,
+ path: util.NewFullPath(entry.Dir, entry.Entry.Name),
+ }
+ }
+ return nil
+ })
+
+ return nil
}
-func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose *bool, applyPurging *bool) (error) {
+func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
+
+ for volumeId, vinfo := range volumeIdToVInfo {
+ checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose)
+ if checkErr != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
+ }
+ }
+ return nil
+}
+
+func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
@@ -108,7 +200,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u
totalOrphanChunkCount += uint64(len(orphanFileIds))
totalOrphanDataSize += orphanDataSize
- if *verbose {
+ if verbose {
for _, fid := range orphanFileIds {
fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
}
@@ -223,6 +315,59 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
})
}
+func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) {
+
+ db := needle_map.NewMemDb()
+ defer db.Close()
+
+ if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
+ return
+ }
+
+ file := getFilerFileIdFile(tempFolder, volumeId)
+ fp, err := os.Open(file)
+ if err != nil {
+ return
+ }
+ defer fp.Close()
+
+ type Item struct {
+ fileKey uint64
+ cookie uint32
+ path util.FullPath
+ }
+
+ br := bufio.NewReader(fp)
+ buffer := make([]byte, 16)
+ item := &Item{}
+ var readSize int
+ for {
+ readSize, err = br.Read(buffer)
+ if err != nil || readSize != 16 {
+ if err == io.EOF {
+ return nil
+ } else {
+ break
+ }
+ }
+
+ item.fileKey = util.BytesToUint64(buffer[:8])
+ item.cookie = util.BytesToUint32(buffer[8:12])
+ pathSize := util.BytesToUint32(buffer[12:16])
+ pathBytes := make([]byte, int(pathSize))
+ _, err = br.Read(pathBytes)
+ item.path = util.FullPath(string(pathBytes))
+
+ if _, found := db.Get(types.NeedleId(item.fileKey)); !found {
+ fmt.Fprintf(writer, "%d,%x%08x in %s not found\n", volumeId, item.fileKey, item.cookie, item.path)
+ }
+
+ }
+
+ return
+
+}
+
func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
db := needle_map.NewMemDb()