aboutsummaryrefslogtreecommitdiff
path: root/weed/command/fix.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/fix.go')
-rw-r--r--weed/command/fix.go68
1 files changed, 42 insertions, 26 deletions
diff --git a/weed/command/fix.go b/weed/command/fix.go
index 3643c9d58..90d1c4893 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -7,6 +7,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -28,6 +31,32 @@ var (
fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
)
+type VolumeFileScanner4Fix struct {
+ version needle.Version
+ nm *needle_map.MemDb
+}
+
+func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
+ return nil
+
+}
+func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
+ return false
+}
+
+func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
+ if n.Size > 0 && n.Size != types.TombstoneFileSize {
+ pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
+ glog.V(2).Infof("saved %d with error %v", n.Size, pe)
+ } else {
+ glog.V(2).Infof("skipping deleted file ...")
+ return scanner.nm.Delete(n.Id)
+ }
+ return nil
+}
+
func runFix(cmd *Command, args []string) bool {
if *fixVolumeId == -1 {
@@ -39,35 +68,22 @@ func runFix(cmd *Command, args []string) bool {
baseFileName = *fixVolumeCollection + "_" + baseFileName
}
indexFileName := path.Join(*fixVolumePath, baseFileName+".idx")
- indexFile, err := os.OpenFile(indexFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if err != nil {
- glog.Fatalf("Create Volume Index [ERROR] %s\n", err)
- }
- defer indexFile.Close()
- nm := storage.NewBtreeNeedleMap(indexFile)
+ nm := needle_map.NewMemDb()
defer nm.Close()
- var version storage.Version
- vid := storage.VolumeId(*fixVolumeId)
- err = storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid,
- storage.NeedleMapInMemory,
- func(superBlock storage.SuperBlock) error {
- version = superBlock.Version()
- return nil
- }, false, func(n *storage.Needle, offset int64) error {
- glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(version), n.IsGzipped())
- if n.Size > 0 {
- pe := nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size)
- glog.V(2).Infof("saved %d with error %v", n.Size, pe)
- } else {
- glog.V(2).Infof("skipping deleted file ...")
- return nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize))
- }
- return nil
- })
- if err != nil {
- glog.Fatalf("Export Volume File [ERROR] %s\n", err)
+ vid := needle.VolumeId(*fixVolumeId)
+ scanner := &VolumeFileScanner4Fix{
+ nm: nm,
+ }
+
+ if err := storage.ScanVolumeFile(*fixVolumePath, *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
+ glog.Fatalf("scan .dat File: %v", err)
+ os.Remove(indexFileName)
+ }
+
+ if err := nm.SaveToIdx(indexFileName); err != nil {
+ glog.Fatalf("save to .idx File: %v", err)
os.Remove(indexFileName)
}