aboutsummaryrefslogtreecommitdiff
path: root/go/weed/benchmark.go
diff options
context:
space:
mode:
authorBrian McQueen <bmcquee@l-sclX1Q0DV7-M.local>2014-12-14 00:13:51 -0800
committerBrian McQueen <bmcquee@l-sclX1Q0DV7-M.local>2014-12-14 00:13:51 -0800
commita3583e4e7cdba69346397b963193eda9ed10c3a3 (patch)
tree5c984294280a16779c416a90f0f19e28cb98e7f4 /go/weed/benchmark.go
parentbd664def45925d81dfae9c7edfb244d2367170ca (diff)
parente431d4121e8da8d7fc243b29b780c2cd535a4210 (diff)
downloadseaweedfs-a3583e4e7cdba69346397b963193eda9ed10c3a3.tar.xz
seaweedfs-a3583e4e7cdba69346397b963193eda9ed10c3a3.zip
Merge branch 'master' of https://github.com/chrislusf/weed-fs
Diffstat (limited to 'go/weed/benchmark.go')
-rw-r--r--go/weed/benchmark.go277
1 files changed, 149 insertions, 128 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index fec8472e5..f4f0b1874 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -2,9 +2,6 @@ package main
import (
"bufio"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/operation"
- "github.com/chrislusf/weed-fs/go/util"
"fmt"
"io"
"math"
@@ -16,6 +13,10 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/operation"
+ "github.com/chrislusf/weed-fs/go/util"
)
type BenchmarkOptions struct {
@@ -30,11 +31,14 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
cpuprofile *string
+ maxCpu *int
vid2server map[string]string //cache for vid locations
+
}
var (
- b BenchmarkOptions
+ b BenchmarkOptions
+ sharedBytes []byte
)
func init() {
@@ -50,33 +54,35 @@ func init() {
b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
- b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file")
+ b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
+ b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.vid2server = make(map[string]string)
+ sharedBytes = make([]byte, 1024)
}
var cmdBenchmark = &Command{
UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000",
Short: "benchmark on writing millions of files and read out",
Long: `benchmark on an empty weed file system.
-
+
Two tests during benchmark:
1) write lots of small files to the system
2) read the files out
-
+
The file content is mostly zero, but no compression is done.
-
+
You can choose to only benchmark read or write.
During write, the list of uploaded file ids is stored in "-list" specified file.
You can also use your own list of file ids to run read test.
-
+
Write speed and read speed will be collected.
The numbers are used to get a sense of the system.
Usually your network or the hard drive is the real bottleneck.
-
+
Another thing to watch is whether the volumes are evenly distributed
to each volume server. Because the 7 more benchmark volumes are randomly distributed
to servers with free slots, it's highly possible some servers have uneven amount of
- benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
+ benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
before starting the benchmark command:
http://localhost:9333/vol/grow?collection=benchmark&count=5
@@ -87,18 +93,17 @@ var cmdBenchmark = &Command{
}
var (
- wait sync.WaitGroup
- writeStats *stats
- readStats *stats
- serverLimitChan map[string]chan bool
+ wait sync.WaitGroup
+ writeStats *stats
+ readStats *stats
)
-func init() {
- serverLimitChan = make(map[string]chan bool)
-}
-
func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ if *b.maxCpu < 1 {
+ *b.maxCpu = runtime.NumCPU()
+ }
+ runtime.GOMAXPROCS(*b.maxCpu)
if *b.cpuprofile != "" {
f, err := os.Create(*b.cpuprofile)
if err != nil {
@@ -122,12 +127,12 @@ func runbenchmark(cmd *Command, args []string) bool {
func bench_write() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- writeStats = newStats()
+ writeStats = newStats(*b.concurrency)
idChan := make(chan int)
- wait.Add(*b.concurrency)
go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
for i := 0; i < *b.concurrency; i++ {
- go writeFiles(idChan, fileIdLineChan, writeStats)
+ wait.Add(1)
+ go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
}
writeStats.start = time.Now()
writeStats.total = *b.numberOfFiles
@@ -138,28 +143,30 @@ func bench_write() {
close(idChan)
wait.Wait()
writeStats.end = time.Now()
- wait.Add(1)
+ wait.Add(2)
finishChan <- true
finishChan <- true
- close(finishChan)
wait.Wait()
+ close(finishChan)
writeStats.printStats()
}
func bench_read() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- readStats = newStats()
- wait.Add(*b.concurrency)
+ readStats = newStats(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now()
readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ {
- go readFiles(fileIdLineChan, readStats)
+ wait.Add(1)
+ go readFiles(fileIdLineChan, &readStats.localStats[i])
}
wait.Wait()
+ wait.Add(1)
finishChan <- true
+ wait.Wait()
close(finishChan)
readStats.end = time.Now()
readStats.printStats()
@@ -170,126 +177,102 @@ type delayedFile struct {
fp *operation.FilePart
}
-func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
+func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup
for i := 0; i < 7; i++ {
+ waitForDeletions.Add(1)
go func() {
- waitForDeletions.Add(1)
+ defer waitForDeletions.Done()
for df := range delayedDeleteChan {
- if df == nil {
- break
- }
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
}
- fp := df.fp
- serverLimitChan[fp.Server] <- true
- if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
+ if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
s.completed++
} else {
s.failed++
}
- <-serverLimitChan[fp.Server]
}
- waitForDeletions.Done()
}()
}
- for {
- if id, ok := <-idChan; ok {
- start := time.Now()
- fileSize := int64(*b.fileSize + rand.Intn(64))
- fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
- if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
- fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
- if _, ok := serverLimitChan[fp.Server]; !ok {
- serverLimitChan[fp.Server] = make(chan bool, 7)
- }
- serverLimitChan[fp.Server] <- true
- if _, err := fp.Upload(0, *b.server); err == nil {
- if rand.Intn(100) < *b.deletePercentage {
- s.total++
- delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
- } else {
- fileIdLineChan <- fp.Fid
- }
- s.completed++
- s.transferred += fileSize
+ for id := range idChan {
+ start := time.Now()
+ fileSize := int64(*b.fileSize + rand.Intn(64))
+ fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
+ if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
+ fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
+ if _, err := fp.Upload(0, *b.server); err == nil {
+ if rand.Intn(100) < *b.deletePercentage {
+ s.total++
+ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else {
- s.failed++
- }
- writeStats.addSample(time.Now().Sub(start))
- <-serverLimitChan[fp.Server]
- if *cmdBenchmark.IsDebug {
- fmt.Printf("writing %d file %s\n", id, fp.Fid)
+ fileIdLineChan <- fp.Fid
}
+ s.completed++
+ s.transferred += fileSize
} else {
s.failed++
- println("writing file error:", err.Error())
+ fmt.Printf("Failed to write with error:%v\n", err)
+ }
+ writeStats.addSample(time.Now().Sub(start))
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("writing %d file %s\n", id, fp.Fid)
}
} else {
- break
+ s.failed++
+ println("writing file error:", err.Error())
}
}
close(delayedDeleteChan)
waitForDeletions.Wait()
- wait.Done()
}
-func readFiles(fileIdLineChan chan string, s *stats) {
- serverLimitChan := make(map[string]chan bool)
+func readFiles(fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
masterLimitChan := make(chan bool, 1)
- for {
- if fid, ok := <-fileIdLineChan; ok {
- if len(fid) == 0 {
- continue
- }
- if fid[0] == '#' {
- continue
- }
- if *cmdBenchmark.IsDebug {
- fmt.Printf("reading file %s\n", fid)
- }
- parts := strings.SplitN(fid, ",", 2)
- vid := parts[0]
- start := time.Now()
- if server, ok := b.vid2server[vid]; !ok {
- masterLimitChan <- true
- if _, now_ok := b.vid2server[vid]; !now_ok {
- if ret, err := operation.Lookup(*b.server, vid); err == nil {
- if len(ret.Locations) > 0 {
- server = ret.Locations[0].PublicUrl
- b.vid2server[vid] = server
- }
+ for fid := range fileIdLineChan {
+ if len(fid) == 0 {
+ continue
+ }
+ if fid[0] == '#' {
+ continue
+ }
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("reading file %s\n", fid)
+ }
+ parts := strings.SplitN(fid, ",", 2)
+ vid := parts[0]
+ start := time.Now()
+ if server, ok := b.vid2server[vid]; !ok {
+ masterLimitChan <- true
+ if _, now_ok := b.vid2server[vid]; !now_ok {
+ if ret, err := operation.Lookup(*b.server, vid); err == nil {
+ if len(ret.Locations) > 0 {
+ server = ret.Locations[0].PublicUrl
+ b.vid2server[vid] = server
}
}
- <-masterLimitChan
}
- if server, ok := b.vid2server[vid]; ok {
- if _, ok := serverLimitChan[server]; !ok {
- serverLimitChan[server] = make(chan bool, 7)
- }
- serverLimitChan[server] <- true
- url := "http://" + server + "/" + fid
- if bytesRead, err := util.Get(url); err == nil {
- s.completed++
- s.transferred += int64(len(bytesRead))
- readStats.addSample(time.Now().Sub(start))
- } else {
- s.failed++
- println("!!!! Failed to read from ", url, " !!!!!")
- }
- <-serverLimitChan[server]
+ <-masterLimitChan
+ }
+ if server, ok := b.vid2server[vid]; ok {
+ url := "http://" + server + "/" + fid
+ if bytesRead, err := util.Get(url); err == nil {
+ s.completed++
+ s.transferred += int64(len(bytesRead))
+ readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- println("!!!! volume id ", vid, " location not found!!!!!")
+ fmt.Printf("Failed to read %s error:%v\n", url, err)
}
} else {
- break
+ s.failed++
+ println("!!!! volume id ", vid, " location not found!!!!!")
}
}
- wait.Done()
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
@@ -353,20 +336,28 @@ const (
// An efficient statics collecting and rendering
type stats struct {
- data []int
- overflow []int
+ data []int
+ overflow []int
+ localStats []stat
+ start time.Time
+ end time.Time
+ total int
+}
+type stat struct {
completed int
failed int
total int
transferred int64
- start time.Time
- end time.Time
}
var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
-func newStats() *stats {
- return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
+func newStats(n int) *stats {
+ return &stats{
+ data: make([]int, benchResolution),
+ overflow: make([]int, 0),
+ localStats: make([]stat, n),
+ }
}
func (s *stats) addSample(d time.Duration) {
@@ -387,28 +378,41 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
for {
select {
case <-finishChan:
+ wait.Done()
return
case t := <-ticker:
- completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
+ completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
- s.completed, s.total, float64(s.completed)*100/float64(s.total),
- float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
- float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
+ completed, total, float64(completed)*100/float64(total),
+ float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
+ float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
)
- lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
+ lastCompleted, lastTransferred, lastTime = completed, transferred, t
}
}
}
func (s *stats) printStats() {
+ completed, failed, transferred, total := 0, 0, int64(0), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ failed += localStat.failed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
- fmt.Printf("Complete requests: %d\n", s.completed)
- fmt.Printf("Failed requests: %d\n", s.failed)
- fmt.Printf("Total transferred: %d bytes\n", s.transferred)
- fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
- fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
+ fmt.Printf("Complete requests: %d\n", completed)
+ fmt.Printf("Failed requests: %d\n", failed)
+ fmt.Printf("Total transferred: %d bytes\n", transferred)
+ fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
+ fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
n, sum := 0, 0
min, max := 10000000, 0
for i := 0; i < len(s.data); i++ {
@@ -496,15 +500,32 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
} else {
n = len(p)
}
- for i := 0; i < n-8; i += 8 {
- for s := uint(0); s < 8; s++ {
- p[i] = byte(l.id >> (s * 8))
+ if n >= 8 {
+ for i := 0; i < 8; i++ {
+ p[i] = byte(l.id >> uint(i*8))
}
}
l.size -= int64(n)
return
}
+func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
+ size := int(l.size)
+ bufferSize := len(sharedBytes)
+ for size > 0 {
+ tempBuffer := sharedBytes
+ if size < bufferSize {
+ tempBuffer = sharedBytes[0:size]
+ }
+ count, e := w.Write(tempBuffer)
+ if e != nil {
+ return int64(size), e
+ }
+ size -= count
+ }
+ return l.size, nil
+}
+
func Readln(r *bufio.Reader) ([]byte, error) {
var (
isPrefix bool = true