aboutsummaryrefslogtreecommitdiff
path: root/unmaintained
diff options
context:
space:
mode:
Diffstat (limited to 'unmaintained')
-rw-r--r--unmaintained/change_superblock/change_superblock.go13
-rw-r--r--unmaintained/check_disk_size/check_disk_size.go42
-rw-r--r--unmaintained/compact_leveldb/compact_leveldb.go39
-rw-r--r--unmaintained/fix_dat/fix_dat.go32
-rw-r--r--unmaintained/load_test/load_test_leveldb/load_test_leveldb.go155
-rw-r--r--unmaintained/remove_duplicate_fids/remove_duplicate_fids.go95
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go62
-rw-r--r--unmaintained/see_dat/see_dat.go22
-rw-r--r--unmaintained/see_dat/see_dat_gzip.go83
-rw-r--r--unmaintained/see_idx/see_idx.go7
-rw-r--r--unmaintained/see_log_entry/see_log_entry.go75
-rw-r--r--unmaintained/see_meta/see_meta.go68
-rw-r--r--unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go136
-rw-r--r--unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go150
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go69
15 files changed, 1001 insertions, 47 deletions
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go
index 779580a9b..afe651c4e 100644
--- a/unmaintained/change_superblock/change_superblock.go
+++ b/unmaintained/change_superblock/change_superblock.go
@@ -8,7 +8,9 @@ import (
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
var (
@@ -46,9 +48,10 @@ func main() {
if err != nil {
glog.Fatalf("Open Volume Data File [ERROR]: %v", err)
}
- defer datFile.Close()
+ datBackend := backend.NewDiskFile(datFile)
+ defer datBackend.Close()
- superBlock, err := storage.ReadSuperBlock(datFile)
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
glog.Fatalf("cannot parse existing super block: %v", err)
@@ -60,7 +63,7 @@ func main() {
hasChange := false
if *targetReplica != "" {
- replica, err := storage.NewReplicaPlacementFromString(*targetReplica)
+ replica, err := super_block.NewReplicaPlacementFromString(*targetReplica)
if err != nil {
glog.Fatalf("cannot parse target replica %s: %v", *targetReplica, err)
@@ -73,7 +76,7 @@ func main() {
}
if *targetTTL != "" {
- ttl, err := storage.ReadTTL(*targetTTL)
+ ttl, err := needle.ReadTTL(*targetTTL)
if err != nil {
glog.Fatalf("cannot parse target ttl %s: %v", *targetTTL, err)
diff --git a/unmaintained/check_disk_size/check_disk_size.go b/unmaintained/check_disk_size/check_disk_size.go
new file mode 100644
index 000000000..4a8b92b88
--- /dev/null
+++ b/unmaintained/check_disk_size/check_disk_size.go
@@ -0,0 +1,42 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "runtime"
+ "syscall"
+)
+
+var (
+ dir = flag.String("dir", ".", "the directory which uses a disk")
+)
+
+func main() {
+ flag.Parse()
+
+ fillInDiskStatus(*dir)
+
+ fmt.Printf("OS: %v\n", runtime.GOOS)
+ fmt.Printf("Arch: %v\n", runtime.GOARCH)
+
+}
+
+func fillInDiskStatus(dir string) {
+ fs := syscall.Statfs_t{}
+ err := syscall.Statfs(dir, &fs)
+ if err != nil {
+ fmt.Printf("failed to statfs on %s: %v\n", dir, err)
+ return
+ }
+ fmt.Printf("statfs: %+v\n", fs)
+ fmt.Println()
+
+ total := fs.Blocks * uint64(fs.Bsize)
+ free := fs.Bfree * uint64(fs.Bsize)
+ fmt.Printf("Total: %d blocks x %d block size = %d bytes\n", fs.Blocks, uint64(fs.Bsize), total)
+ fmt.Printf("Free : %d blocks x %d block size = %d bytes\n", fs.Bfree, uint64(fs.Bsize), free)
+ fmt.Printf("Used : %d blocks x %d block size = %d bytes\n", fs.Blocks-fs.Bfree, uint64(fs.Bsize), total-free)
+ fmt.Printf("Free Percentage : %.2f%%\n", float32((float64(free)/float64(total))*100))
+ fmt.Printf("Used Percentage : %.2f%%\n", float32((float64(total-free)/float64(total))*100))
+ return
+}
diff --git a/unmaintained/compact_leveldb/compact_leveldb.go b/unmaintained/compact_leveldb/compact_leveldb.go
new file mode 100644
index 000000000..9be5697de
--- /dev/null
+++ b/unmaintained/compact_leveldb/compact_leveldb.go
@@ -0,0 +1,39 @@
+package main
+
+import (
+ "flag"
+ "log"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/util"
+)
+
+var (
+ dir = flag.String("dir", ".", "data directory to store leveldb files")
+)
+
+func main() {
+
+ flag.Parse()
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10,
+ OpenFilesCacheCapacity: -1,
+ }
+
+ db, err := leveldb.OpenFile(*dir, opts)
+ if errors.IsCorrupted(err) {
+ db, err = leveldb.RecoverFile(*dir, opts)
+ }
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer db.Close()
+ if err := db.CompactRange(util.Range{}); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go
index 9eb64b3b4..d6110d870 100644
--- a/unmaintained/fix_dat/fix_dat.go
+++ b/unmaintained/fix_dat/fix_dat.go
@@ -9,7 +9,9 @@ import (
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -43,11 +45,13 @@ func main() {
glog.Fatalf("Read Volume Index %v", err)
}
defer indexFile.Close()
- datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644)
+ datFileName := path.Join(*fixVolumePath, fileName+".dat")
+ datFile, err := os.OpenFile(datFileName, os.O_RDONLY, 0644)
if err != nil {
glog.Fatalf("Read Volume Data %v", err)
}
- defer datFile.Close()
+ datBackend := backend.NewDiskFile(datFile)
+ defer datBackend.Close()
newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed"))
if err != nil {
@@ -55,21 +59,21 @@ func main() {
}
defer newDatFile.Close()
- superBlock, err := storage.ReadSuperBlock(datFile)
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
glog.Fatalf("Read Volume Data superblock %v", err)
}
newDatFile.Write(superBlock.Bytes())
- iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) {
+ iterateEntries(datBackend, indexFile, func(n *needle.Needle, offset int64) {
fmt.Printf("needle id=%v name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
- _, s, _, e := n.Append(newDatFile, superBlock.Version())
+ _, s, _, e := n.Append(datBackend, superBlock.Version)
fmt.Printf("size %d error %v\n", s, e)
})
}
-func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needle, offset int64)) {
+func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) {
// start to read index file
var readerOffset int64
bytes := make([]byte, 16)
@@ -77,14 +81,14 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
readerOffset += int64(count)
// start to read dat file
- superBlock, err := storage.ReadSuperBlock(datFile)
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
fmt.Printf("cannot read dat file super block: %v", err)
return
}
offset := int64(superBlock.BlockSize())
- version := superBlock.Version()
- n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
+ version := superBlock.Version
+ n, _, rest, err := needle.ReadNeedleHeader(datBackend, version, offset)
if err != nil {
fmt.Printf("cannot read needle header: %v", err)
return
@@ -106,7 +110,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
fmt.Printf("key: %d offsetFromIndex %d n.Size %d sizeFromIndex:%d\n", key, offsetFromIndex, n.Size, sizeFromIndex)
- rest = storage.NeedleBodyLength(sizeFromIndex, version)
+ rest = needle.NeedleBodyLength(sizeFromIndex, version)
func() {
defer func() {
@@ -114,7 +118,7 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
fmt.Println("Recovered in f", r)
}
}()
- if err = n.ReadNeedleBody(datFile, version, offset+int64(types.NeedleEntrySize), rest); err != nil {
+ if _, err = n.ReadNeedleBody(datBackend, version, offset+int64(types.NeedleHeaderSize), rest); err != nil {
fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err)
}
}()
@@ -124,9 +128,9 @@ func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needl
}
visitNeedle(n, offset)
- offset += types.NeedleEntrySize + rest
+ offset += types.NeedleHeaderSize + rest
//fmt.Printf("==> new entry offset %d\n", offset)
- if n, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil {
+ if n, _, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
if err == io.EOF {
return
}
diff --git a/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go
new file mode 100644
index 000000000..43dfb0e21
--- /dev/null
+++ b/unmaintained/load_test/load_test_leveldb/load_test_leveldb.go
@@ -0,0 +1,155 @@
+package load_test_leveldb
+
+import (
+ "crypto/md5"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+)
+
+var (
+ dir = flag.String("dir", "./t", "directory to store level db files")
+ useHash = flag.Bool("isHash", false, "hash the path as the key")
+ dbCount = flag.Int("dbCount", 1, "the number of leveldb")
+)
+
+func main() {
+
+ flag.Parse()
+
+ totalTenants := 300
+ totalYears := 3
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+
+ var dbs []*leveldb.DB
+ var chans []chan string
+ for d := 0; d < *dbCount; d++ {
+ dbFolder := fmt.Sprintf("%s/%02d", *dir, d)
+ os.MkdirAll(dbFolder, 0755)
+ db, err := leveldb.OpenFile(dbFolder, opts)
+ if err != nil {
+ log.Printf("filer store open dir %s: %v", *dir, err)
+ return
+ }
+ dbs = append(dbs, db)
+ chans = append(chans, make(chan string, 1024))
+ }
+
+ var wg sync.WaitGroup
+ for d := 0; d < *dbCount; d++ {
+ wg.Add(1)
+ go func(d int) {
+ defer wg.Done()
+
+ ch := chans[d]
+ db := dbs[d]
+
+ for p := range ch {
+ if *useHash {
+ insertAsHash(db, p)
+ } else {
+ insertAsFullPath(db, p)
+ }
+ }
+ }(d)
+ }
+
+ counter := int64(0)
+ lastResetTime := time.Now()
+
+ r := rand.New(rand.NewSource(35))
+
+ for y := 0; y < totalYears; y++ {
+ for m := 0; m < 12; m++ {
+ for d := 0; d < 31; d++ {
+ for h := 0; h < 24; h++ {
+ for min := 0; min < 60; min++ {
+ for i := 0; i < totalTenants; i++ {
+ p := fmt.Sprintf("tenent%03d/%4d/%02d/%02d/%02d/%02d", i, 2015+y, 1+m, 1+d, h, min)
+
+ x := r.Intn(*dbCount)
+
+ chans[x] <- p
+
+ counter++
+ }
+
+ t := time.Now()
+ if lastResetTime.Add(time.Second).Before(t) {
+ p := fmt.Sprintf("%4d/%02d/%02d/%02d/%02d", 2015+y, 1+m, 1+d, h, min)
+ fmt.Printf("%s = %4d put/sec\n", p, counter)
+ counter = 0
+ lastResetTime = t
+ }
+ }
+ }
+ }
+ }
+ }
+
+ for d := 0; d < *dbCount; d++ {
+ close(chans[d])
+ }
+
+ wg.Wait()
+
+}
+
+func insertAsFullPath(db *leveldb.DB, p string) {
+ _, getErr := db.Get([]byte(p), nil)
+ if getErr == leveldb.ErrNotFound {
+ putErr := db.Put([]byte(p), []byte(p), nil)
+ if putErr != nil {
+ log.Printf("failed to put %s", p)
+ }
+ }
+}
+
+func insertAsHash(db *leveldb.DB, p string) {
+ key := fmt.Sprintf("%d:%s", hashToLong(p), p)
+ _, getErr := db.Get([]byte(key), nil)
+ if getErr == leveldb.ErrNotFound {
+ putErr := db.Put([]byte(key), []byte(p), nil)
+ if putErr != nil {
+ log.Printf("failed to put %s", p)
+ }
+ }
+}
+
+func hashToLong(dir string) (v int64) {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ v += int64(b[0])
+ v <<= 8
+ v += int64(b[1])
+ v <<= 8
+ v += int64(b[2])
+ v <<= 8
+ v += int64(b[3])
+ v <<= 8
+ v += int64(b[4])
+ v <<= 8
+ v += int64(b[5])
+ v <<= 8
+ v += int64(b[6])
+ v <<= 8
+ v += int64(b[7])
+
+ return
+}
diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
new file mode 100644
index 000000000..84173a663
--- /dev/null
+++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
@@ -0,0 +1,95 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "path/filepath"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+var (
+ volumePath = flag.String("dir", "/tmp", "data directory to store files")
+ volumeCollection = flag.String("collection", "", "the volume collection name")
+ volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+)
+
+func Checksum(n *needle.Needle) string {
+ return fmt.Sprintf("%s%x", n.Id, n.Cookie)
+}
+
+type VolumeFileScanner4SeeDat struct {
+ version needle.Version
+ block super_block.SuperBlock
+
+ dir string
+ hashes map[string]bool
+ dat *os.File
+ datBackend backend.BackendStorageFile
+}
+
+func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
+ scanner.block = superBlock
+ return nil
+
+}
+func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+
+ if scanner.datBackend == nil {
+ newFileName := filepath.Join(*volumePath, "dat_fixed")
+ newDatFile, err := os.Create(newFileName)
+ if err != nil {
+ glog.Fatalf("Write New Volume Data %v", err)
+ }
+ scanner.datBackend = backend.NewDiskFile(newDatFile)
+ scanner.datBackend.WriteAt(scanner.block.Bytes(), 0)
+ }
+
+ checksum := Checksum(n)
+
+ if scanner.hashes[checksum] {
+ glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset)
+ return nil
+ }
+ scanner.hashes[checksum] = true
+
+ _, s, _, e := n.Append(scanner.datBackend, scanner.version)
+ fmt.Printf("size %d error %v\n", s, e)
+
+ return nil
+}
+
+func main() {
+ flag.Parse()
+
+ vid := needle.VolumeId(*volumeId)
+
+ outpath, _ := filepath.Abs(filepath.Dir(os.Args[0]))
+
+ scanner := &VolumeFileScanner4SeeDat{
+ dir: filepath.Join(outpath, "out"),
+ hashes: map[string]bool{},
+ }
+
+ if _, err := os.Stat(scanner.dir); err != nil {
+ if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil {
+ glog.Fatalf("could not create output dir : %s", err)
+ }
+ }
+
+ err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner)
+ if err != nil {
+ glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
+ }
+
+}
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 7cc583f56..12ac42dbe 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -1,45 +1,73 @@
package main
import (
- "bytes"
"flag"
"fmt"
"log"
"math/rand"
+ "time"
+
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
var (
- master = flag.String("master", "127.0.0.1:9333", "the master server")
- repeat = flag.Int("n", 5, "repeat how many times")
+ master = flag.String("master", "127.0.0.1:9333", "the master server")
+ repeat = flag.Int("n", 5, "repeat how many times")
+ garbageThreshold = flag.Float64("garbageThreshold", 0.3, "garbageThreshold")
+ replication = flag.String("replication", "", "replication 000, 001, 002, etc")
)
func main() {
flag.Parse()
- for i := 0; i < *repeat; i++ {
- assignResult, err := operation.Assign(*master, &operation.VolumeAssignRequest{Count: 1})
- if err != nil {
- log.Fatalf("assign: %v", err)
+ util.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ genFile(grpcDialOption, 0)
+
+ go func() {
+ for {
+ println("vacuum threshold", *garbageThreshold)
+ _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
+ if err != nil {
+ log.Fatalf("vacuum: %v", err)
+ }
+ time.Sleep(time.Second)
}
+ }()
- data := make([]byte, 1024)
- rand.Read(data)
- reader := bytes.NewReader(data)
+ for i := 0; i < *repeat; i++ {
+ // create 2 files, and delete one of them
- targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+ assignResult, targetUrl := genFile(grpcDialOption, i)
- _, err = operation.Upload(targetUrl, fmt.Sprintf("test%d", i), reader, false, "", nil, "")
- if err != nil {
- log.Fatalf("upload: %v", err)
- }
+ util.Delete(targetUrl, string(assignResult.Auth))
- util.Delete(targetUrl, "")
+ }
- util.Get(fmt.Sprintf("http://%s/vol/vacuum", *master))
+}
+func genFile(grpcDialOption grpc.DialOption, i int) (*operation.AssignResult, string) {
+ assignResult, err := operation.Assign(*master, grpcDialOption, &operation.VolumeAssignRequest{
+ Count: 1,
+ Replication: *replication,
+ })
+ if err != nil {
+ log.Fatalf("assign: %v", err)
}
+ data := make([]byte, 1024)
+ rand.Read(data)
+
+ targetUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+
+ _, err = operation.UploadData(targetUrl, fmt.Sprintf("test%d", i), false, data, false, "bench/test", nil, assignResult.Auth)
+ if err != nil {
+ log.Fatalf("upload: %v", err)
+ }
+ return assignResult, targetUrl
}
diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go
index f79c0a6a9..17c494841 100644
--- a/unmaintained/see_dat/see_dat.go
+++ b/unmaintained/see_dat/see_dat.go
@@ -2,8 +2,13 @@ package main
import (
"flag"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
var (
@@ -13,32 +18,33 @@ var (
)
type VolumeFileScanner4SeeDat struct {
- version storage.Version
+ version needle.Version
}
-func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
return nil
}
func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
- return false
+ return true
}
-func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *storage.Needle, offset int64) error {
- glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie)
+func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second))
+ glog.V(0).Infof("%d,%s%x offset %d size %d(%s) cookie %x appendedAt %v",
+ *volumeId, n.Id, n.Cookie, offset, n.Size, util.BytesToHumanReadable(uint64(n.Size)), n.Cookie, t)
return nil
}
func main() {
flag.Parse()
- vid := storage.VolumeId(*volumeId)
+ vid := needle.VolumeId(*volumeId)
scanner := &VolumeFileScanner4SeeDat{}
err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner)
if err != nil {
glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
}
-
}
diff --git a/unmaintained/see_dat/see_dat_gzip.go b/unmaintained/see_dat/see_dat_gzip.go
new file mode 100644
index 000000000..cec073e3f
--- /dev/null
+++ b/unmaintained/see_dat/see_dat_gzip.go
@@ -0,0 +1,83 @@
+package main
+
+import (
+ "bytes"
+ "compress/gzip"
+ "crypto/md5"
+ "flag"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "time"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type VolumeFileScanner4SeeDat struct {
+ version needle.Version
+}
+
+func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
+ return nil
+}
+
+func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
+ return true
+}
+
+var (
+ files = int64(0)
+ filebytes = int64(0)
+ diffbytes = int64(0)
+)
+
+func Compresssion(data []byte) float64 {
+ if len(data) <= 128 {
+ return 100.0
+ }
+ compressed, _ := util.GzipData(data[0:128])
+ return float64(len(compressed)*10) / 1280.0
+}
+
+func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second))
+ glog.V(0).Info("----------------------------------------------------------------------------------")
+ glog.V(0).Infof("%d,%s%x offset %d size %d(%s) cookie %x appendedAt %v hasmime[%t] mime[%s] (len: %d)",
+ *volumeId, n.Id, n.Cookie, offset, n.Size, util.BytesToHumanReadable(uint64(n.Size)), n.Cookie, t, n.HasMime(), string(n.Mime), len(n.Mime))
+ r, err := gzip.NewReader(bytes.NewReader(n.Data))
+ if err == nil {
+ buf := bytes.Buffer{}
+ h := md5.New()
+ c, _ := io.Copy(&buf, r)
+ d := buf.Bytes()
+ io.Copy(h, bytes.NewReader(d))
+ diff := (int64(n.DataSize) - int64(c))
+ diffbytes += diff
+ glog.V(0).Infof("was gzip! stored_size: %d orig_size: %d diff: %d(%d) mime:%s compression-of-128: %.2f md5: %x", n.DataSize, c, diff, diffbytes, http.DetectContentType(d), Compresssion(d), h.Sum(nil))
+ } else {
+ glog.V(0).Infof("no gzip!")
+ }
+ return nil
+}
+
+var (
+ _ = ioutil.ReadAll
+ volumePath = flag.String("dir", "/tmp", "data directory to store files")
+ volumeCollection = flag.String("collection", "", "the volume collection name")
+ volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
+)
+
+func main() {
+ flag.Parse()
+ vid := needle.VolumeId(*volumeId)
+ glog.V(0).Info("Starting")
+ scanner := &VolumeFileScanner4SeeDat{}
+ err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner)
+ if err != nil {
+ glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
+ }
+}
diff --git a/unmaintained/see_idx/see_idx.go b/unmaintained/see_idx/see_idx.go
index 23ca04c2e..47cbd291b 100644
--- a/unmaintained/see_idx/see_idx.go
+++ b/unmaintained/see_idx/see_idx.go
@@ -3,12 +3,13 @@ package main
import (
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"os"
"path"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -35,8 +36,8 @@ func main() {
}
defer indexFile.Close()
- storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
- fmt.Printf("key:%v offset:%v size:%v\n", key, offset, size)
+ idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ fmt.Printf("key:%v offset:%v size:%v(%v)\n", key, offset, size, util.BytesToHumanReadable(uint64(size)))
return nil
})
diff --git a/unmaintained/see_log_entry/see_log_entry.go b/unmaintained/see_log_entry/see_log_entry.go
new file mode 100644
index 000000000..34965f6be
--- /dev/null
+++ b/unmaintained/see_log_entry/see_log_entry.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "os"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ logdataFile = flag.String("logdata", "", "log data file saved under "+ filer2.SystemLogDir)
+)
+
+func main() {
+ flag.Parse()
+
+ dst, err := os.OpenFile(*logdataFile, os.O_RDONLY, 0644)
+ if err != nil {
+ log.Fatalf("failed to open %s: %v", *logdataFile, err)
+ }
+ defer dst.Close()
+
+ err = walkLogEntryFile(dst)
+ if err != nil {
+ log.Fatalf("failed to visit %s: %v", *logdataFile, err)
+ }
+
+}
+
+func walkLogEntryFile(dst *os.File) error {
+
+ sizeBuf := make([]byte, 4)
+
+ for {
+ if n, err := dst.Read(sizeBuf); n != 4 {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+
+ size := util.BytesToUint32(sizeBuf)
+
+ data := make([]byte, int(size))
+
+ if n, err := dst.Read(data); n != len(data) {
+ return err
+ }
+
+ logEntry := &filer_pb.LogEntry{}
+ err := proto.Unmarshal(data, logEntry)
+ if err != nil {
+ log.Printf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ return nil
+ }
+
+ event := &filer_pb.SubscribeMetadataResponse{}
+ err = proto.Unmarshal(logEntry.Data, event)
+ if err != nil {
+ log.Printf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return nil
+ }
+
+ fmt.Printf("event: %+v\n", event)
+
+ }
+
+}
diff --git a/unmaintained/see_meta/see_meta.go b/unmaintained/see_meta/see_meta.go
new file mode 100644
index 000000000..452badfd6
--- /dev/null
+++ b/unmaintained/see_meta/see_meta.go
@@ -0,0 +1,68 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "os"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ metaFile = flag.String("meta", "", "meta file generated via fs.meta.save")
+)
+
+func main() {
+ flag.Parse()
+
+ dst, err := os.OpenFile(*metaFile, os.O_RDONLY, 0644)
+ if err != nil {
+ log.Fatalf("failed to open %s: %v", *metaFile, err)
+ }
+ defer dst.Close()
+
+ err = walkMetaFile(dst)
+ if err != nil {
+ log.Fatalf("failed to visit %s: %v", *metaFile, err)
+ }
+
+}
+
+func walkMetaFile(dst *os.File) error {
+
+ sizeBuf := make([]byte, 4)
+
+ for {
+ if n, err := dst.Read(sizeBuf); n != 4 {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+
+ size := util.BytesToUint32(sizeBuf)
+
+ data := make([]byte, int(size))
+
+ if n, err := dst.Read(data); n != len(data) {
+ return err
+ }
+
+ fullEntry := &filer_pb.FullEntry{}
+ if err := proto.Unmarshal(data, fullEntry); err != nil {
+ return err
+ }
+
+ fmt.Fprintf(os.Stdout, "file %s %v\n", util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name), fullEntry.Entry.Attributes.String())
+ for i, chunk := range fullEntry.Entry.Chunks {
+ fmt.Fprintf(os.Stdout, " chunk %d %v\n", i+1, chunk.String())
+ }
+
+ }
+
+}
diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
new file mode 100644
index 000000000..b2e4b28c6
--- /dev/null
+++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
@@ -0,0 +1,136 @@
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ size = flag.Int("size", 1024, "file size")
+ concurrency = flag.Int("c", 4, "concurrent number of uploads")
+ times = flag.Int("n", 1024, "repeated number of times")
+ fileCount = flag.Int("fileCount", 1, "number of files to write")
+ destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
+
+ statsChan = make(chan stat, 8)
+)
+
+type stat struct {
+ size int64
+}
+
+func main() {
+
+ flag.Parse()
+
+ data := make([]byte, *size)
+ println("data len", len(data))
+
+ var wg sync.WaitGroup
+ for x := 0; x < *concurrency; x++ {
+ wg.Add(1)
+
+ go func(x int) {
+ defer wg.Done()
+
+ client := &http.Client{Transport: &http.Transport{
+ MaxConnsPerHost: 1024,
+ MaxIdleConnsPerHost: 1024,
+ }}
+ r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))
+
+ for t := 0; t < *times; t++ {
+ for f := 0; f < *fileCount; f++ {
+ fn := r.Intn(*fileCount)
+ if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
+ statsChan <- stat{
+ size: size,
+ }
+ } else {
+ log.Fatalf("client %d upload %d times: %v", x, t, err)
+ }
+ }
+ }
+ }(x)
+ }
+
+ go func() {
+ ticker := time.NewTicker(1000 * time.Millisecond)
+
+ var lastTime time.Time
+ var counter, size int64
+ for {
+ select {
+ case stat := <-statsChan:
+ size += stat.size
+ counter++
+ case x := <-ticker.C:
+ if !lastTime.IsZero() {
+ elapsed := x.Sub(lastTime).Seconds()
+ fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
+ float64(counter)/elapsed,
+ float64(size/1024/1024)/elapsed)
+ }
+ lastTime = x
+ size = 0
+ counter = 0
+ }
+ }
+ }()
+
+ wg.Wait()
+
+}
+
+func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) {
+
+ if !strings.HasSuffix(destination, "/") {
+ destination = destination + "/"
+ }
+
+ body := &bytes.Buffer{}
+ writer := multipart.NewWriter(body)
+ part, err := writer.CreateFormFile("file", filename)
+ if err != nil {
+ return 0, fmt.Errorf("fail to create form %v: %v", filename, err)
+ }
+
+ part.Write(data)
+
+ err = writer.Close()
+ if err != nil {
+ return 0, fmt.Errorf("fail to write part %v: %v", filename, err)
+ }
+
+ uri := destination + filename
+
+ request, err := http.NewRequest("POST", uri, body)
+ request.Header.Set("Content-Type", writer.FormDataContentType())
+ // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also.
+
+ resp, err := client.Do(request)
+ if err != nil {
+ return 0, fmt.Errorf("http POST %s: %v", uri, err)
+ } else {
+ body := &bytes.Buffer{}
+ _, err := body.ReadFrom(resp.Body)
+ if err != nil {
+ return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
+ }
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }
+
+ return int64(len(data)), nil
+}
diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
new file mode 100644
index 000000000..8b986b546
--- /dev/null
+++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
@@ -0,0 +1,150 @@
+package main
+
+import (
+ "bytes"
+ "flag"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "mime/multipart"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ dir = flag.String("dir", ".", "upload files under this directory")
+ concurrency = flag.Int("c", 1, "concurrent number of uploads")
+ times = flag.Int("n", 1, "repeated number of times")
+ destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
+
+ statsChan = make(chan stat, 8)
+)
+
+type stat struct {
+ size int64
+}
+
+func main() {
+
+ flag.Parse()
+
+ var fileNames []string
+
+ files, err := ioutil.ReadDir(*dir)
+ if err != nil {
+ log.Fatalf("fail to read dir %v: %v", *dir, err)
+ }
+
+ for _, file := range files {
+ if file.IsDir() {
+ continue
+ }
+ fileNames = append(fileNames, filepath.Join(*dir, file.Name()))
+ }
+
+ var wg sync.WaitGroup
+ for x := 0; x < *concurrency; x++ {
+ wg.Add(1)
+
+ client := &http.Client{}
+
+ go func() {
+ defer wg.Done()
+ rand.Shuffle(len(fileNames), func(i, j int) {
+ fileNames[i], fileNames[j] = fileNames[j], fileNames[i]
+ })
+ for t := 0; t < *times; t++ {
+ for _, filename := range fileNames {
+ if size, err := uploadFileToFiler(client, filename, *destination); err == nil {
+ statsChan <- stat{
+ size: size,
+ }
+ }
+ }
+ }
+ }()
+ }
+
+ go func() {
+ ticker := time.NewTicker(500 * time.Millisecond)
+
+ var lastTime time.Time
+ var counter, size int64
+ for {
+ select {
+ case stat := <-statsChan:
+ size += stat.size
+ counter++
+ case x := <-ticker.C:
+ if !lastTime.IsZero() {
+ elapsed := x.Sub(lastTime).Seconds()
+ fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
+ float64(counter)/elapsed,
+ float64(size/1024/1024)/elapsed)
+ }
+ lastTime = x
+ size = 0
+ counter = 0
+ }
+ }
+ }()
+
+ wg.Wait()
+
+}
+
+func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) {
+ file, err := os.Open(filename)
+ if err != nil {
+ panic(err)
+ }
+ defer file.Close()
+
+ fi, err := file.Stat()
+
+ if !strings.HasSuffix(destination, "/") {
+ destination = destination + "/"
+ }
+
+ body := &bytes.Buffer{}
+ writer := multipart.NewWriter(body)
+ part, err := writer.CreateFormFile("file", file.Name())
+ if err != nil {
+ return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err)
+ }
+ _, err = io.Copy(part, file)
+ if err != nil {
+ return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
+ }
+
+ err = writer.Close()
+ if err != nil {
+ return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
+ }
+
+ uri := destination + file.Name()
+
+ request, err := http.NewRequest("POST", uri, body)
+ request.Header.Set("Content-Type", writer.FormDataContentType())
+
+ resp, err := client.Do(request)
+ if err != nil {
+ return 0, fmt.Errorf("http POST %s: %v", uri, err)
+ } else {
+ body := &bytes.Buffer{}
+ _, err := body.ReadFrom(resp.Body)
+ if err != nil {
+ return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
+ }
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }
+
+ return fi.Size(), nil
+}
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
new file mode 100644
index 000000000..e93f1cc13
--- /dev/null
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -0,0 +1,69 @@
+package main
+
+import (
+ "flag"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ util2 "github.com/chrislusf/seaweedfs/weed/util"
+ "golang.org/x/tools/godoc/util"
+)
+
+var (
+ master = flag.String("master", "localhost:9333", "master server host and port")
+ volumeId = flag.Int("volumeId", -1, "a volume id")
+ rewindDuration = flag.Duration("rewind", -1, "rewind back in time. -1 means from the first entry. 0 means from now.")
+ timeoutSeconds = flag.Int("timeoutSeconds", 0, "disconnect if no activity after these seconds")
+ showTextFile = flag.Bool("showTextFile", false, "display textual file content")
+)
+
+func main() {
+ flag.Parse()
+
+ util2.LoadConfiguration("security", false)
+ grpcDialOption := security.LoadClientTLS(util2.GetViper(), "grpc.client")
+
+ vid := needle.VolumeId(*volumeId)
+
+ var sinceTimeNs int64
+ if *rewindDuration == 0 {
+ sinceTimeNs = time.Now().UnixNano()
+ } else if *rewindDuration == -1 {
+ sinceTimeNs = 0
+ } else if *rewindDuration > 0 {
+ sinceTimeNs = time.Now().Add(-*rewindDuration).UnixNano()
+ }
+
+ err := operation.TailVolume(*master, grpcDialOption, vid, uint64(sinceTimeNs), *timeoutSeconds, func(n *needle.Needle) (err error) {
+ if n.Size == 0 {
+ println("-", n.String())
+ return nil
+ } else {
+ println("+", n.String())
+ }
+
+ if *showTextFile {
+
+ data := n.Data
+ if n.IsCompressed() {
+ if data, err = util2.DecompressData(data); err != nil {
+ return err
+ }
+ }
+ if util.IsText(data) {
+ println(string(data))
+ }
+
+ println("-", n.String(), "compressed", n.IsCompressed(), "original size", len(data))
+ }
+ return nil
+ })
+
+ if err != nil {
+ log.Printf("Error VolumeTailSender volume %d: %v", vid, err)
+ }
+
+}