aboutsummaryrefslogtreecommitdiff
path: root/weed/command/fix.go
blob: 7b99c1d775610b32184da555d6d850c89804cbef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package command

import (
	"fmt"
	"io/ioutil"
	"os"
	"path"
	"strconv"
	"strings"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/storage"
	"github.com/chrislusf/seaweedfs/weed/storage/needle"
	"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
	"github.com/chrislusf/seaweedfs/weed/storage/super_block"
	"github.com/chrislusf/seaweedfs/weed/storage/types"
	"github.com/chrislusf/seaweedfs/weed/util"
)

func init() {
	cmdFix.Run = runFix // break init cycle
}

var cmdFix = &Command{
	UsageLine: "fix -dir=/tmp -volumeId=234",
	Short:     "run weed tool fix on index file if corrupted",
	Long: `Fix runs the SeaweedFS fix command to re-create the index .idx file.

  `,
}

var (
	fixVolumePath       = cmdFix.Flag.String("dir", ".", "data directory to store files")
	fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
	fixVolumeId         = cmdFix.Flag.Int("volumeId", 0, "an optional volume id.")
)

type VolumeFileScanner4Fix struct {
	version needle.Version
	nm      *needle_map.MemDb
}

func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
	scanner.version = superBlock.Version
	return nil

}
func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
	return false
}

func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
	glog.V(2).Infof("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
	if n.Size.IsValid() {
		pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
		glog.V(2).Infof("saved %d with error %v", n.Size, pe)
	} else {
		glog.V(2).Infof("skipping deleted file ...")
		return scanner.nm.Delete(n.Id)
	}
	return nil
}

func runFix(cmd *Command, args []string) bool {

	dir := util.ResolvePath(*fixVolumePath)
	if *fixVolumeId != 0 {
		doFixOneVolume(dir, *fixVolumeCollection, needle.VolumeId(*fixVolumeId))
		return true
	}

	files, err := ioutil.ReadDir(dir)
	if err != nil {
		fmt.Println(err)
		return false
	}

	for _, file := range files {
		if !strings.HasSuffix(file.Name(), ".dat") {
			continue
		}
		if *fixVolumeCollection != "" {
			if !strings.HasPrefix(file.Name(), *fixVolumeCollection+"_") {
				continue
			}
		}
		baseFileName := file.Name()[:len(file.Name())-4]
		collection, volumeIdStr := "", baseFileName
		if sepIndex := strings.LastIndex(baseFileName, "_"); sepIndex > 0 {
			collection = baseFileName[:sepIndex]
			volumeIdStr = baseFileName[sepIndex+1:]
		}
		volumeId, parseErr := strconv.ParseInt(volumeIdStr, 10, 64)
		if parseErr != nil {
			fmt.Printf("Failed to parse volume id from %s: %v\n", baseFileName, parseErr)
			return false
		}
		doFixOneVolume(dir, collection, needle.VolumeId(volumeId))
	}

	return true
}

func doFixOneVolume(dir, collection string, volumeId needle.VolumeId) {

	baseFileName := strconv.Itoa(int(volumeId))
	if collection != "" {
		baseFileName = collection + "_" + baseFileName
	}

	indexFileName := path.Join(dir, baseFileName+".idx")

	nm := needle_map.NewMemDb()
	defer nm.Close()

	vid := needle.VolumeId(*fixVolumeId)
	scanner := &VolumeFileScanner4Fix{
		nm: nm,
	}

	if err := storage.ScanVolumeFile(dir, collection, vid, storage.NeedleMapInMemory, scanner); err != nil {
		glog.Fatalf("scan .dat File: %v", err)
		os.Remove(indexFileName)
	}

	if err := nm.SaveToIdx(indexFileName); err != nil {
		glog.Fatalf("save to .idx File: %v", err)
		os.Remove(indexFileName)
	}
}