diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2019-12-30 13:05:50 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-12-30 13:05:50 +0800 |
| commit | 70da715d8d917527291b35fb069fac077d17b868 (patch) | |
| tree | b89bad02094cc7131bc2c9f64df13e15f9de9914 /unmaintained | |
| parent | 93a7df500ffeed766e395907e860b1733040ff23 (diff) | |
| parent | 09043c8e5a3b43add589344d28d4f57e90c83f70 (diff) | |
| download | seaweedfs-70da715d8d917527291b35fb069fac077d17b868.tar.xz seaweedfs-70da715d8d917527291b35fb069fac077d17b868.zip | |
Merge pull request #4 from chrislusf/master
Syncing to the original repository
Diffstat (limited to 'unmaintained')
| -rw-r--r-- | unmaintained/change_superblock/change_superblock.go | 10 | ||||
| -rw-r--r-- | unmaintained/fix_dat/fix_dat.go | 27 | ||||
| -rw-r--r-- | unmaintained/load_test/load_test_leveldb/load_test_leveldb.go | 13 | ||||
| -rw-r--r-- | unmaintained/remove_duplicate_fids/remove_duplicate_fids.go | 95 | ||||
| -rw-r--r-- | unmaintained/see_dat/see_dat.go | 11 |
5 files changed, 127 insertions, 29 deletions
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 07d9b94e4..afe651c4e 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -8,8 +8,9 @@ import ( "strconv" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) var ( @@ -47,9 +48,10 @@ func main() { if err != nil { glog.Fatalf("Open Volume Data File [ERROR]: %v", err) } - defer datFile.Close() + datBackend := backend.NewDiskFile(datFile) + defer datBackend.Close() - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { glog.Fatalf("cannot parse existing super block: %v", err) @@ -61,7 +63,7 @@ func main() { hasChange := false if *targetReplica != "" { - replica, err := storage.NewReplicaPlacementFromString(*targetReplica) + replica, err := super_block.NewReplicaPlacementFromString(*targetReplica) if err != nil { glog.Fatalf("cannot parse target replica %s: %v", *targetReplica, err) diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index a72a78eed..d6110d870 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -9,8 +9,9 @@ import ( "strconv" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -44,11 +45,13 @@ func main() { glog.Fatalf("Read Volume Index %v", err) } defer indexFile.Close() - datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644) + datFileName := path.Join(*fixVolumePath, fileName+".dat") + datFile, err := os.OpenFile(datFileName, os.O_RDONLY, 0644) if err != nil { glog.Fatalf("Read Volume Data %v", err) } - defer datFile.Close() + datBackend := backend.NewDiskFile(datFile) + defer datBackend.Close() newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed")) if err != nil { @@ -56,21 +59,21 @@ func main() { } defer newDatFile.Close() - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { glog.Fatalf("Read Volume Data superblock %v", err) } newDatFile.Write(superBlock.Bytes()) - iterateEntries(datFile, indexFile, func(n *needle.Needle, offset int64) { + iterateEntries(datBackend, indexFile, func(n *needle.Needle, offset int64) { fmt.Printf("needle id=%v name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize) - _, s, _, e := n.Append(newDatFile, superBlock.Version()) + _, s, _, e := n.Append(datBackend, superBlock.Version) fmt.Printf("size %d error %v\n", s, e) }) } -func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) { +func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) { // start to read index file var readerOffset int64 bytes := make([]byte, 16) @@ -78,14 +81,14 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *needle.Needle readerOffset += int64(count) // start to read dat file - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { fmt.Printf("cannot read dat file super block: %v", err) return } offset := int64(superBlock.BlockSize()) - version := superBlock.Version() - n, _, rest, err := needle.ReadNeedleHeader(datFile, version, offset) + version := superBlock.Version + n, _, rest, err := needle.ReadNeedleHeader(datBackend, version, offset) if err != nil { fmt.Printf("cannot read needle header: %v", err) return @@ -115,7 +118,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *needle.Needle fmt.Println("Recovered in f", r) } }() - if _, err = n.ReadNeedleBody(datFile, version, offset+int64(types.NeedleHeaderSize), rest); err != nil { + if _, err = n.ReadNeedleBody(datBackend, version, offset+int64(types.NeedleHeaderSize), rest); err != nil { fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err) } }() @@ -127,7 +130,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *needle.Needle offset += types.NeedleHeaderSize + rest //fmt.Printf("==> new entry offset %d\n", offset) - if n, _, rest, err = needle.ReadNeedleHeader(datFile, version, offset); err != nil { + if n, _, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { if err == io.EOF { return } diff --git a/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go index 518a5081c..43dfb0e21 100644 --- a/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go +++ b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go @@ -16,7 +16,7 @@ import ( ) var ( - dir = flag.String("dir", "./t", "directory to store level db files") + dir = flag.String("dir", "./t", "directory to store level db files") useHash = flag.Bool("isHash", false, "hash the path as the key") dbCount = flag.Int("dbCount", 1, "the number of leveldb") ) @@ -36,7 +36,7 @@ func main() { var dbs []*leveldb.DB var chans []chan string - for d := 0 ; d < *dbCount; d++ { + for d := 0; d < *dbCount; d++ { dbFolder := fmt.Sprintf("%s/%02d", *dir, d) os.MkdirAll(dbFolder, 0755) db, err := leveldb.OpenFile(dbFolder, opts) @@ -49,9 +49,9 @@ func main() { } var wg sync.WaitGroup - for d := 0 ; d < *dbCount; d++ { + for d := 0; d < *dbCount; d++ { wg.Add(1) - go func(d int){ + go func(d int) { defer wg.Done() ch := chans[d] @@ -60,14 +60,13 @@ func main() { for p := range ch { if *useHash { insertAsHash(db, p) - }else{ + } else { insertAsFullPath(db, p) } } }(d) } - counter := int64(0) lastResetTime := time.Now() @@ -101,7 +100,7 @@ func main() { } } - for d := 0 ; d < *dbCount; d++ { + for d := 0; d < *dbCount; d++ { close(chans[d]) } diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go new file mode 100644 index 000000000..84173a663 --- /dev/null +++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +var ( + volumePath = flag.String("dir", "/tmp", "data directory to store files") + volumeCollection = flag.String("collection", "", "the volume collection name") + volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func Checksum(n *needle.Needle) string { + return fmt.Sprintf("%s%x", n.Id, n.Cookie) +} + +type VolumeFileScanner4SeeDat struct { + version needle.Version + block super_block.SuperBlock + + dir string + hashes map[string]bool + dat *os.File + datBackend backend.BackendStorageFile +} + +func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version + scanner.block = superBlock + return nil + +} +func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + + if scanner.datBackend == nil { + newFileName := filepath.Join(*volumePath, "dat_fixed") + newDatFile, err := os.Create(newFileName) + if err != nil { + glog.Fatalf("Write New Volume Data %v", err) + } + scanner.datBackend = backend.NewDiskFile(newDatFile) + scanner.datBackend.WriteAt(scanner.block.Bytes(), 0) + } + + checksum := Checksum(n) + + if scanner.hashes[checksum] { + glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset) + return nil + } + scanner.hashes[checksum] = true + + _, s, _, e := n.Append(scanner.datBackend, scanner.version) + fmt.Printf("size %d error %v\n", s, e) + + return nil +} + +func main() { + flag.Parse() + + vid := needle.VolumeId(*volumeId) + + outpath, _ := filepath.Abs(filepath.Dir(os.Args[0])) + + scanner := &VolumeFileScanner4SeeDat{ + dir: filepath.Join(outpath, "out"), + hashes: map[string]bool{}, + } + + if _, err := os.Stat(scanner.dir); err != nil { + if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil { + glog.Fatalf("could not create output dir : %s", err) + } + } + + err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) + if err != nil { + glog.Fatalf("Reading Volume File [ERROR] %s\n", err) + } + +} diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index e8e54fd4f..efc58e751 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -2,12 +2,12 @@ package main import ( "flag" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" - - "time" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) var ( @@ -20,8 +20,8 @@ type VolumeFileScanner4SeeDat struct { version needle.Version } -func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } @@ -29,7 +29,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { return true } -func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error { +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t) return nil @@ -45,5 +45,4 @@ func main() { if err != nil { glog.Fatalf("Reading Volume File [ERROR] %s\n", err) } - } |
