diff options
Diffstat (limited to 'unmaintained')
| -rw-r--r-- | unmaintained/change_superblock/change_superblock.go | 13 | ||||
| -rw-r--r-- | unmaintained/check_disk_size/check_disk_size.go | 42 | ||||
| -rw-r--r-- | unmaintained/compact_leveldb/compact_leveldb.go | 39 | ||||
| -rw-r--r-- | unmaintained/fix_dat/fix_dat.go | 32 | ||||
| -rw-r--r-- | unmaintained/load_test/load_test_leveldb/load_test_leveldb.go | 155 | ||||
| -rw-r--r-- | unmaintained/remove_duplicate_fids/remove_duplicate_fids.go | 95 | ||||
| -rw-r--r-- | unmaintained/repeated_vacuum/repeated_vacuum.go | 62 | ||||
| -rw-r--r-- | unmaintained/see_dat/see_dat.go | 22 | ||||
| -rw-r--r-- | unmaintained/see_dat/see_dat_gzip.go | 83 | ||||
| -rw-r--r-- | unmaintained/see_idx/see_idx.go | 7 | ||||
| -rw-r--r-- | unmaintained/see_log_entry/see_log_entry.go | 75 | ||||
| -rw-r--r-- | unmaintained/see_meta/see_meta.go | 68 | ||||
| -rw-r--r-- | unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go | 136 | ||||
| -rw-r--r-- | unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go | 150 | ||||
| -rw-r--r-- | unmaintained/volume_tailer/volume_tailer.go | 69 |
15 files changed, 1001 insertions, 47 deletions
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go index 779580a9b..afe651c4e 100644 --- a/unmaintained/change_superblock/change_superblock.go +++ b/unmaintained/change_superblock/change_superblock.go @@ -8,7 +8,9 @@ import ( "strconv" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) var ( @@ -46,9 +48,10 @@ func main() { if err != nil { glog.Fatalf("Open Volume Data File [ERROR]: %v", err) } - defer datFile.Close() + datBackend := backend.NewDiskFile(datFile) + defer datBackend.Close() - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { glog.Fatalf("cannot parse existing super block: %v", err) @@ -60,7 +63,7 @@ func main() { hasChange := false if *targetReplica != "" { - replica, err := storage.NewReplicaPlacementFromString(*targetReplica) + replica, err := super_block.NewReplicaPlacementFromString(*targetReplica) if err != nil { glog.Fatalf("cannot parse target replica %s: %v", *targetReplica, err) @@ -73,7 +76,7 @@ func main() { } if *targetTTL != "" { - ttl, err := storage.ReadTTL(*targetTTL) + ttl, err := needle.ReadTTL(*targetTTL) if err != nil { glog.Fatalf("cannot parse target ttl %s: %v", *targetTTL, err) diff --git a/unmaintained/check_disk_size/check_disk_size.go b/unmaintained/check_disk_size/check_disk_size.go new file mode 100644 index 000000000..4a8b92b88 --- /dev/null +++ b/unmaintained/check_disk_size/check_disk_size.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "fmt" + "runtime" + "syscall" +) + +var ( + dir = flag.String("dir", ".", "the directory which uses a disk") +) + +func main() { + flag.Parse() + + fillInDiskStatus(*dir) + + fmt.Printf("OS: %v\n", runtime.GOOS) + fmt.Printf("Arch: %v\n", runtime.GOARCH) + +} + +func fillInDiskStatus(dir string) { + fs := syscall.Statfs_t{} + err := syscall.Statfs(dir, &fs) + if err != nil { + fmt.Printf("failed to statfs on %s: %v\n", dir, err) + return + } + fmt.Printf("statfs: %+v\n", fs) + fmt.Println() + + total := fs.Blocks * uint64(fs.Bsize) + free := fs.Bfree * uint64(fs.Bsize) + fmt.Printf("Total: %d blocks x %d block size = %d bytes\n", fs.Blocks, uint64(fs.Bsize), total) + fmt.Printf("Free : %d blocks x %d block size = %d bytes\n", fs.Bfree, uint64(fs.Bsize), free) + fmt.Printf("Used : %d blocks x %d block size = %d bytes\n", fs.Blocks-fs.Bfree, uint64(fs.Bsize), total-free) + fmt.Printf("Free Percentage : %.2f%%\n", float32((float64(free)/float64(total))*100)) + fmt.Printf("Used Percentage : %.2f%%\n", float32((float64(total-free)/float64(total))*100)) + return +} diff --git a/unmaintained/compact_leveldb/compact_leveldb.go b/unmaintained/compact_leveldb/compact_leveldb.go new file mode 100644 index 000000000..9be5697de --- /dev/null +++ b/unmaintained/compact_leveldb/compact_leveldb.go @@ -0,0 +1,39 @@ +package main + +import ( + "flag" + "log" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var ( + dir = flag.String("dir", ".", "data directory to store leveldb files") +) + +func main() { + + flag.Parse() + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, + OpenFilesCacheCapacity: -1, + } + + db, err := leveldb.OpenFile(*dir, opts) + if errors.IsCorrupted(err) { + db, err = leveldb.RecoverFile(*dir, opts) + } + if err != nil { + log.Fatal(err) + } + defer db.Close() + if err := db.CompactRange(util.Range{}); err != nil { + log.Fatal(err) + } +} diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go index 9eb64b3b4..d6110d870 100644 --- a/unmaintained/fix_dat/fix_dat.go +++ b/unmaintained/fix_dat/fix_dat.go @@ -9,7 +9,9 @@ import ( "strconv" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -43,11 +45,13 @@ func main() { glog.Fatalf("Read Volume Index %v", err) } defer indexFile.Close() - datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644) + datFileName := path.Join(*fixVolumePath, fileName+".dat") + datFile, err := os.OpenFile(datFileName, os.O_RDONLY, 0644) if err != nil { glog.Fatalf("Read Volume Data %v", err) } - defer datFile.Close() + datBackend := backend.NewDiskFile(datFile) + defer datBackend.Close() newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed")) if err != nil { @@ -55,21 +59,21 @@ func main() { } defer newDatFile.Close() - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { glog.Fatalf("Read Volume Data superblock %v", err) } newDatFile.Write(superBlock.Bytes()) - iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) { + iterateEntries(datBackend, indexFile, func(n *needle.Needle, offset int64) { fmt.Printf("needle id=%v name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize) - _, s, _, e := n.Append(newDatFile, superBlock.Version()) + _, s, _, e := n.Append(datBackend, superBlock.Version) fmt.Printf("size %d error %v\n", s, e) }) } -func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needle, offset int64)) { +func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) { // start to read index file var readerOffset int64 bytes := make([]byte, 16) @@ -77,14 +81,14 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl readerOffset += int64(count) // start to read dat file - superBlock, err := storage.ReadSuperBlock(datFile) + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { fmt.Printf("cannot read dat file super block: %v", err) return } offset := int64(superBlock.BlockSize()) - version := superBlock.Version() - n, rest, err := storage.ReadNeedleHeader(datFile, version, offset) + version := superBlock.Version + n, _, rest, err := needle.ReadNeedleHeader(datBackend, version, offset) if err != nil { fmt.Printf("cannot read needle header: %v", err) return @@ -106,7 +110,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl fmt.Printf("key: %d offsetFromIndex %d n.Size %d sizeFromIndex:%d\n", key, offsetFromIndex, n.Size, sizeFromIndex) - rest = storage.NeedleBodyLength(sizeFromIndex, version) + rest = needle.NeedleBodyLength(sizeFromIndex, version) func() { defer func() { @@ -114,7 +118,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl fmt.Println("Recovered in f", r) } }() - if err = n.ReadNeedleBody(datFile, version, offset+int64(types.NeedleEntrySize), rest); err != nil { + if _, err = n.ReadNeedleBody(datBackend, version, offset+int64(types.NeedleHeaderSize), rest); err != nil { fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err) } }() @@ -124,9 +128,9 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl } visitNeedle(n, offset) - offset += types.NeedleEntrySize + rest + offset += types.NeedleHeaderSize + rest //fmt.Printf("==> new entry offset %d\n", offset) - if n, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil { + if n, _, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { if err == io.EOF { return } diff --git a/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go new file mode 100644 index 000000000..43dfb0e21 --- /dev/null +++ b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go @@ -0,0 +1,155 @@ +package load_test_leveldb + +import ( + "crypto/md5" + "flag" + "fmt" + "io" + "log" + "math/rand" + "os" + "sync" + "time" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +var ( + dir = flag.String("dir", "./t", "directory to store level db files") + useHash = flag.Bool("isHash", false, "hash the path as the key") + dbCount = flag.Int("dbCount", 1, "the number of leveldb") +) + +func main() { + + flag.Parse() + + totalTenants := 300 + totalYears := 3 + + opts := &opt.Options{ + BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 4, + } + + var dbs []*leveldb.DB + var chans []chan string + for d := 0; d < *dbCount; d++ { + dbFolder := fmt.Sprintf("%s/%02d", *dir, d) + os.MkdirAll(dbFolder, 0755) + db, err := leveldb.OpenFile(dbFolder, opts) + if err != nil { + log.Printf("filer store open dir %s: %v", *dir, err) + return + } + dbs = append(dbs, db) + chans = append(chans, make(chan string, 1024)) + } + + var wg sync.WaitGroup + for d := 0; d < *dbCount; d++ { + wg.Add(1) + go func(d int) { + defer wg.Done() + + ch := chans[d] + db := dbs[d] + + for p := range ch { + if *useHash { + insertAsHash(db, p) + } else { + insertAsFullPath(db, p) + } + } + }(d) + } + + counter := int64(0) + lastResetTime := time.Now() + + r := rand.New(rand.NewSource(35)) + + for y := 0; y < totalYears; y++ { + for m := 0; m < 12; m++ { + for d := 0; d < 31; d++ { + for h := 0; h < 24; h++ { + for min := 0; min < 60; min++ { + for i := 0; i < totalTenants; i++ { + p := fmt.Sprintf("tenent%03d/%4d/%02d/%02d/%02d/%02d", i, 2015+y, 1+m, 1+d, h, min) + + x := r.Intn(*dbCount) + + chans[x] <- p + + counter++ + } + + t := time.Now() + if lastResetTime.Add(time.Second).Before(t) { + p := fmt.Sprintf("%4d/%02d/%02d/%02d/%02d", 2015+y, 1+m, 1+d, h, min) + fmt.Printf("%s = %4d put/sec\n", p, counter) + counter = 0 + lastResetTime = t + } + } + } + } + } + } + + for d := 0; d < *dbCount; d++ { + close(chans[d]) + } + + wg.Wait() + +} + +func insertAsFullPath(db *leveldb.DB, p string) { + _, getErr := db.Get([]byte(p), nil) + if getErr == leveldb.ErrNotFound { + putErr := db.Put([]byte(p), []byte(p), nil) + if putErr != nil { + log.Printf("failed to put %s", p) + } + } +} + +func insertAsHash(db *leveldb.DB, p string) { + key := fmt.Sprintf("%d:%s", hashToLong(p), p) + _, getErr := db.Get([]byte(key), nil) + if getErr == leveldb.ErrNotFound { + putErr := db.Put([]byte(key), []byte(p), nil) + if putErr != nil { + log.Printf("failed to put %s", p) + } + } +} + +func hashToLong(dir string) (v int64) { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + v += int64(b[0]) + v <<= 8 + v += int64(b[1]) + v <<= 8 + v += int64(b[2]) + v <<= 8 + v += int64(b[3]) + v <<= 8 + v += int64(b[4]) + v <<= 8 + v += int64(b[5]) + v <<= 8 + v += int64(b[6]) + v <<= 8 + v += int64(b[7]) + + return +} diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go new file mode 100644 index 000000000..84173a663 --- /dev/null +++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go @@ -0,0 +1,95 @@ +package main + +import ( + "flag" + "fmt" + "os" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" +) + +var ( + volumePath = flag.String("dir", "/tmp", "data directory to store files") + volumeCollection = flag.String("collection", "", "the volume collection name") + volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func Checksum(n *needle.Needle) string { + return fmt.Sprintf("%s%x", n.Id, n.Cookie) +} + +type VolumeFileScanner4SeeDat struct { + version needle.Version + block super_block.SuperBlock + + dir string + hashes map[string]bool + dat *os.File + datBackend backend.BackendStorageFile +} + +func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version + scanner.block = superBlock + return nil + +} +func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + + if scanner.datBackend == nil { + newFileName := filepath.Join(*volumePath, "dat_fixed") + newDatFile, err := os.Create(newFileName) + if err != nil { + glog.Fatalf("Write New Volume Data %v", err) + } + scanner.datBackend = backend.NewDiskFile(newDatFile) + scanner.datBackend.WriteAt(scanner.block.Bytes(), 0) + } + + checksum := Checksum(n) + + if scanner.hashes[checksum] { + glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset) + return nil + } + scanner.hashes[checksum] = true + + _, s, _, e := n.Append(scanner.datBackend, scanner.version) + fmt.Printf("size %d error %v\n", s, e) + + return nil +} + +func main() { + flag.Parse() + + vid := needle.VolumeId(*volumeId) + + outpath, _ := filepath.Abs(filepath.Dir(os.Args[0])) + + scanner := &VolumeFileScanner4SeeDat{ + dir: filepath.Join(outpath, "out"), + hashes: map[string]bool{}, + } + + if _, err := os.Stat(scanner.dir); err != nil { + if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil { + glog.Fatalf("could not create output dir : %s", err) + } + } + + err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) + if err != nil { + glog.Fatalf("Reading Volume File [ERROR] %s\n", err) + } + +} diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 7cc583f56..12ac42dbe 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -1,45 +1,73 @@ package main import ( - "bytes" "flag" "fmt" "log" "math/rand" + "time" + + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" ) var ( - master = flag.String("master", "127.0.0.1:9333", "the master server") - repeat = flag.Int("n", 5, "repeat how many times") + master = flag.String("master", "127.0.0.1:9333", "the master server") + repeat = flag.Int("n", 5, "repeat how many times") + garbageThreshold = flag.Float64("garbageThreshold", 0.3, "garbageThreshold") + replication = flag.String("replication", "", "replication 000, 001, 002, etc") ) func main() { flag.Parse() - for i := 0; i < *repeat; i++ { - assignResult, err := operation.Assign(*master, &operation.VolumeAssignRequest{Count: 1}) - if err != nil { - log.Fatalf("assign: %v", err) + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + + genFile(grpcDialOption, 0) + + go func() { + for { + println("vacuum threshold", *garbageThreshold) + _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold)) + if err != nil { + log.Fatalf("vacuum: %v", err) + } + time.Sleep(time.Second) } + }() - data := make([]byte, 1024) - rand.Read(data) - reader := bytes.NewReader(data) + for i := 0; i < *repeat; i++ { + // create 2 files, and delete one of them - targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + assignResult, targetUrl := genFile(grpcDialOption, i) - _, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, "") - if err != nil { - log.Fatalf("upload: %v", err) - } + util.Delete(targetUrl, string(assignResult.Auth)) - util.Delete(targetUrl, "") + } - util.Get(fmt.Sprintf("http://%s/vol/vacuum", *master)) +} +func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) { + assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{ + Count: 1, + Replication: *replication, + }) + if err != nil { + log.Fatalf("assign: %v", err) } + data := make([]byte, 1024) + rand.Read(data) + + targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + + _, err = operation.UploadData(targetUrl, fmt.Sprintf("test%d", i), false, data, false, "bench/test", nil, assignResult.Auth) + if err != nil { + log.Fatalf("upload: %v", err) + } + return assignResult, targetUrl } diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go index f79c0a6a9..17c494841 100644 --- a/unmaintained/see_dat/see_dat.go +++ b/unmaintained/see_dat/see_dat.go @@ -2,8 +2,13 @@ package main import ( "flag" + "github.com/chrislusf/seaweedfs/weed/util" + "time" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) var ( @@ -13,32 +18,33 @@ var ( ) type VolumeFileScanner4SeeDat struct { - version storage.Version + version needle.Version } -func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version return nil } func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { - return false + return true } -func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *storage.Needle, offset int64) error { - glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie) +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) + glog.V(0).Infof("%d,%s%x offset %d size %d(%s) cookie %x appendedAt %v", + *volumeId, n.Id, n.Cookie, offset, n.Size, util.BytesToHumanReadable(uint64(n.Size)), n.Cookie, t) return nil } func main() { flag.Parse() - vid := storage.VolumeId(*volumeId) + vid := needle.VolumeId(*volumeId) scanner := &VolumeFileScanner4SeeDat{} err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) if err != nil { glog.Fatalf("Reading Volume File [ERROR] %s\n", err) } - } diff --git a/unmaintained/see_dat/see_dat_gzip.go b/unmaintained/see_dat/see_dat_gzip.go new file mode 100644 index 000000000..cec073e3f --- /dev/null +++ b/unmaintained/see_dat/see_dat_gzip.go @@ -0,0 +1,83 @@ +package main + +import ( + "bytes" + "compress/gzip" + "crypto/md5" + "flag" + "io" + "io/ioutil" + "net/http" + "time" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type VolumeFileScanner4SeeDat struct { + version needle.Version +} + +func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version + return nil +} + +func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool { + return true +} + +var ( + files = int64(0) + filebytes = int64(0) + diffbytes = int64(0) +) + +func Compresssion(data []byte) float64 { + if len(data) <= 128 { + return 100.0 + } + compressed, _ := util.GzipData(data[0:128]) + return float64(len(compressed)*10) / 1280.0 +} + +func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second)) + glog.V(0).Info("----------------------------------------------------------------------------------") + glog.V(0).Infof("%d,%s%x offset %d size %d(%s) cookie %x appendedAt %v hasmime[%t] mime[%s] (len: %d)", + *volumeId, n.Id, n.Cookie, offset, n.Size, util.BytesToHumanReadable(uint64(n.Size)), n.Cookie, t, n.HasMime(), string(n.Mime), len(n.Mime)) + r, err := gzip.NewReader(bytes.NewReader(n.Data)) + if err == nil { + buf := bytes.Buffer{} + h := md5.New() + c, _ := io.Copy(&buf, r) + d := buf.Bytes() + io.Copy(h, bytes.NewReader(d)) + diff := (int64(n.DataSize) - int64(c)) + diffbytes += diff + glog.V(0).Infof("was gzip! stored_size: %d orig_size: %d diff: %d(%d) mime:%s compression-of-128: %.2f md5: %x", n.DataSize, c, diff, diffbytes, http.DetectContentType(d), Compresssion(d), h.Sum(nil)) + } else { + glog.V(0).Infof("no gzip!") + } + return nil +} + +var ( + _ = ioutil.ReadAll + volumePath = flag.String("dir", "/tmp", "data directory to store files") + volumeCollection = flag.String("collection", "", "the volume collection name") + volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.") +) + +func main() { + flag.Parse() + vid := needle.VolumeId(*volumeId) + glog.V(0).Info("Starting") + scanner := &VolumeFileScanner4SeeDat{} + err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner) + if err != nil { + glog.Fatalf("Reading Volume File [ERROR] %s\n", err) + } +} diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go index 23ca04c2e..47cbd291b 100644 --- a/unmaintained/see_idx/see_idx.go +++ b/unmaintained/see_idx/see_idx.go @@ -3,12 +3,13 @@ package main import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "os" "path" "strconv" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -35,8 +36,8 @@ func main() { } defer indexFile.Close() - storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { - fmt.Printf("key:%v offset:%v size:%v\n", key, offset, size) + idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { + fmt.Printf("key:%v offset:%v size:%v(%v)\n", key, offset, size, util.BytesToHumanReadable(uint64(size))) return nil }) diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go new file mode 100644 index 000000000..34965f6be --- /dev/null +++ b/unmaintained/see_log_entry/see_log_entry.go @@ -0,0 +1,75 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "os" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir) +) + +func main() { + flag.Parse() + + dst, err := os.OpenFile(*logdataFile, os.O_RDONLY, 0644) + if err != nil { + log.Fatalf("failed to open %s: %v", *logdataFile, err) + } + defer dst.Close() + + err = walkLogEntryFile(dst) + if err != nil { + log.Fatalf("failed to visit %s: %v", *logdataFile, err) + } + +} + +func walkLogEntryFile(dst *os.File) error { + + sizeBuf := make([]byte, 4) + + for { + if n, err := dst.Read(sizeBuf); n != 4 { + if err == io.EOF { + return nil + } + return err + } + + size := util.BytesToUint32(sizeBuf) + + data := make([]byte, int(size)) + + if n, err := dst.Read(data); n != len(data) { + return err + } + + logEntry := &filer_pb.LogEntry{} + err := proto.Unmarshal(data, logEntry) + if err != nil { + log.Printf("unexpected unmarshal filer_pb.LogEntry: %v", err) + return nil + } + + event := &filer_pb.SubscribeMetadataResponse{} + err = proto.Unmarshal(logEntry.Data, event) + if err != nil { + log.Printf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return nil + } + + fmt.Printf("event: %+v\n", event) + + } + +} diff --git a/unmaintained/see_meta/see_meta.go b/unmaintained/see_meta/see_meta.go new file mode 100644 index 000000000..452badfd6 --- /dev/null +++ b/unmaintained/see_meta/see_meta.go @@ -0,0 +1,68 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "os" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + metaFile = flag.String("meta", "", "meta file generated via fs.meta.save") +) + +func main() { + flag.Parse() + + dst, err := os.OpenFile(*metaFile, os.O_RDONLY, 0644) + if err != nil { + log.Fatalf("failed to open %s: %v", *metaFile, err) + } + defer dst.Close() + + err = walkMetaFile(dst) + if err != nil { + log.Fatalf("failed to visit %s: %v", *metaFile, err) + } + +} + +func walkMetaFile(dst *os.File) error { + + sizeBuf := make([]byte, 4) + + for { + if n, err := dst.Read(sizeBuf); n != 4 { + if err == io.EOF { + return nil + } + return err + } + + size := util.BytesToUint32(sizeBuf) + + data := make([]byte, int(size)) + + if n, err := dst.Read(data); n != len(data) { + return err + } + + fullEntry := &filer_pb.FullEntry{} + if err := proto.Unmarshal(data, fullEntry); err != nil { + return err + } + + fmt.Fprintf(os.Stdout, "file %s %v\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String()) + for i, chunk := range fullEntry.Entry.Chunks { + fmt.Fprintf(os.Stdout, " chunk %d %v\n", i+1, chunk.String()) + } + + } + +} diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go new file mode 100644 index 000000000..b2e4b28c6 --- /dev/null +++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go @@ -0,0 +1,136 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "mime/multipart" + "net/http" + "os" + "strings" + "sync" + "time" +) + +var ( + size = flag.Int("size", 1024, "file size") + concurrency = flag.Int("c", 4, "concurrent number of uploads") + times = flag.Int("n", 1024, "repeated number of times") + fileCount = flag.Int("fileCount", 1, "number of files to write") + destination = flag.String("to", "http://localhost:8888/", "destination directory on filer") + + statsChan = make(chan stat, 8) +) + +type stat struct { + size int64 +} + +func main() { + + flag.Parse() + + data := make([]byte, *size) + println("data len", len(data)) + + var wg sync.WaitGroup + for x := 0; x < *concurrency; x++ { + wg.Add(1) + + go func(x int) { + defer wg.Done() + + client := &http.Client{Transport: &http.Transport{ + MaxConnsPerHost: 1024, + MaxIdleConnsPerHost: 1024, + }} + r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x))) + + for t := 0; t < *times; t++ { + for f := 0; f < *fileCount; f++ { + fn := r.Intn(*fileCount) + if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil { + statsChan <- stat{ + size: size, + } + } else { + log.Fatalf("client %d upload %d times: %v", x, t, err) + } + } + } + }(x) + } + + go func() { + ticker := time.NewTicker(1000 * time.Millisecond) + + var lastTime time.Time + var counter, size int64 + for { + select { + case stat := <-statsChan: + size += stat.size + counter++ + case x := <-ticker.C: + if !lastTime.IsZero() { + elapsed := x.Sub(lastTime).Seconds() + fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n", + float64(counter)/elapsed, + float64(size/1024/1024)/elapsed) + } + lastTime = x + size = 0 + counter = 0 + } + } + }() + + wg.Wait() + +} + +func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) { + + if !strings.HasSuffix(destination, "/") { + destination = destination + "/" + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", filename) + if err != nil { + return 0, fmt.Errorf("fail to create form %v: %v", filename, err) + } + + part.Write(data) + + err = writer.Close() + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", filename, err) + } + + uri := destination + filename + + request, err := http.NewRequest("POST", uri, body) + request.Header.Set("Content-Type", writer.FormDataContentType()) + // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also. + + resp, err := client.Do(request) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } else { + body := &bytes.Buffer{} + _, err := body.ReadFrom(resp.Body) + if err != nil { + return 0, fmt.Errorf("read http POST %s response: %v", uri, err) + } + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } + + return int64(len(data)), nil +} diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go new file mode 100644 index 000000000..8b986b546 --- /dev/null +++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go @@ -0,0 +1,150 @@ +package main + +import ( + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math/rand" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" +) + +var ( + dir = flag.String("dir", ".", "upload files under this directory") + concurrency = flag.Int("c", 1, "concurrent number of uploads") + times = flag.Int("n", 1, "repeated number of times") + destination = flag.String("to", "http://localhost:8888/", "destination directory on filer") + + statsChan = make(chan stat, 8) +) + +type stat struct { + size int64 +} + +func main() { + + flag.Parse() + + var fileNames []string + + files, err := ioutil.ReadDir(*dir) + if err != nil { + log.Fatalf("fail to read dir %v: %v", *dir, err) + } + + for _, file := range files { + if file.IsDir() { + continue + } + fileNames = append(fileNames, filepath.Join(*dir, file.Name())) + } + + var wg sync.WaitGroup + for x := 0; x < *concurrency; x++ { + wg.Add(1) + + client := &http.Client{} + + go func() { + defer wg.Done() + rand.Shuffle(len(fileNames), func(i, j int) { + fileNames[i], fileNames[j] = fileNames[j], fileNames[i] + }) + for t := 0; t < *times; t++ { + for _, filename := range fileNames { + if size, err := uploadFileToFiler(client, filename, *destination); err == nil { + statsChan <- stat{ + size: size, + } + } + } + } + }() + } + + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + + var lastTime time.Time + var counter, size int64 + for { + select { + case stat := <-statsChan: + size += stat.size + counter++ + case x := <-ticker.C: + if !lastTime.IsZero() { + elapsed := x.Sub(lastTime).Seconds() + fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n", + float64(counter)/elapsed, + float64(size/1024/1024)/elapsed) + } + lastTime = x + size = 0 + counter = 0 + } + } + }() + + wg.Wait() + +} + +func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) { + file, err := os.Open(filename) + if err != nil { + panic(err) + } + defer file.Close() + + fi, err := file.Stat() + + if !strings.HasSuffix(destination, "/") { + destination = destination + "/" + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", file.Name()) + if err != nil { + return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err) + } + _, err = io.Copy(part, file) + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err) + } + + err = writer.Close() + if err != nil { + return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err) + } + + uri := destination + file.Name() + + request, err := http.NewRequest("POST", uri, body) + request.Header.Set("Content-Type", writer.FormDataContentType()) + + resp, err := client.Do(request) + if err != nil { + return 0, fmt.Errorf("http POST %s: %v", uri, err) + } else { + body := &bytes.Buffer{} + _, err := body.ReadFrom(resp.Body) + if err != nil { + return 0, fmt.Errorf("read http POST %s response: %v", uri, err) + } + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + } + + return fi.Size(), nil +} diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go new file mode 100644 index 000000000..e93f1cc13 --- /dev/null +++ b/unmaintained/volume_tailer/volume_tailer.go @@ -0,0 +1,69 @@ +package main + +import ( + "flag" + "log" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + util2 "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/tools/godoc/util" +) + +var ( + master = flag.String("master", "localhost:9333", "master server host and port") + volumeId = flag.Int("volumeId", -1, "a volume id") + rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.") + timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds") + showTextFile = flag.Bool("showTextFile", false, "display textual file content") +) + +func main() { + flag.Parse() + + util2.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client") + + vid := needle.VolumeId(*volumeId) + + var sinceTimeNs int64 + if *rewindDuration == 0 { + sinceTimeNs = time.Now().UnixNano() + } else if *rewindDuration == -1 { + sinceTimeNs = 0 + } else if *rewindDuration > 0 { + sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano() + } + + err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) { + if n.Size == 0 { + println("-", n.String()) + return nil + } else { + println("+", n.String()) + } + + if *showTextFile { + + data := n.Data + if n.IsCompressed() { + if data, err = util2.DecompressData(data); err != nil { + return err + } + } + if util.IsText(data) { + println(string(data)) + } + + println("-", n.String(), "compressed", n.IsCompressed(), "original size", len(data)) + } + return nil + }) + + if err != nil { + log.Printf("Error VolumeTailSender volume %d: %v", vid, err) + } + +} |
