diff options
Diffstat (limited to 'go/weed/benchmark.go')
| -rw-r--r-- | go/weed/benchmark.go | 277 |
1 files changed, 149 insertions, 128 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index fec8472e5..f4f0b1874 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -2,9 +2,6 @@ package main import ( "bufio" - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/operation" - "github.com/chrislusf/weed-fs/go/util" "fmt" "io" "math" @@ -16,6 +13,10 @@ import ( "strings" "sync" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/operation" + "github.com/chrislusf/weed-fs/go/util" ) type BenchmarkOptions struct { @@ -30,11 +31,14 @@ type BenchmarkOptions struct { sequentialRead *bool collection *string cpuprofile *string + maxCpu *int vid2server map[string]string //cache for vid locations + } var ( - b BenchmarkOptions + b BenchmarkOptions + sharedBytes []byte ) func init() { @@ -50,33 +54,35 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") - b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file") + b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") + b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.vid2server = make(map[string]string) + sharedBytes = make([]byte, 1024) } var cmdBenchmark = &Command{ UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", Short: "benchmark on writing millions of files and read out", Long: `benchmark on an empty weed file system. - + Two tests during benchmark: 1) write lots of small files to the system 2) read the files out - + The file content is mostly zero, but no compression is done. - + You can choose to only benchmark read or write. During write, the list of uploaded file ids is stored in "-list" specified file. You can also use your own list of file ids to run read test. - + Write speed and read speed will be collected. The numbers are used to get a sense of the system. Usually your network or the hard drive is the real bottleneck. - + Another thing to watch is whether the volumes are evenly distributed to each volume server. Because the 7 more benchmark volumes are randomly distributed to servers with free slots, it's highly possible some servers have uneven amount of - benchmark volumes. To remedy this, you can use this to grow the benchmark volumes + benchmark volumes. To remedy this, you can use this to grow the benchmark volumes before starting the benchmark command: http://localhost:9333/vol/grow?collection=benchmark&count=5 @@ -87,18 +93,17 @@ var cmdBenchmark = &Command{ } var ( - wait sync.WaitGroup - writeStats *stats - readStats *stats - serverLimitChan map[string]chan bool + wait sync.WaitGroup + writeStats *stats + readStats *stats ) -func init() { - serverLimitChan = make(map[string]chan bool) -} - func runbenchmark(cmd *Command, args []string) bool { fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + if *b.maxCpu < 1 { + *b.maxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*b.maxCpu) if *b.cpuprofile != "" { f, err := os.Create(*b.cpuprofile) if err != nil { @@ -122,12 +127,12 @@ func runbenchmark(cmd *Command, args []string) bool { func bench_write() { fileIdLineChan := make(chan string) finishChan := make(chan bool) - writeStats = newStats() + writeStats = newStats(*b.concurrency) idChan := make(chan int) - wait.Add(*b.concurrency) go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) for i := 0; i < *b.concurrency; i++ { - go writeFiles(idChan, fileIdLineChan, writeStats) + wait.Add(1) + go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i]) } writeStats.start = time.Now() writeStats.total = *b.numberOfFiles @@ -138,28 +143,30 @@ func bench_write() { close(idChan) wait.Wait() writeStats.end = time.Now() - wait.Add(1) + wait.Add(2) finishChan <- true finishChan <- true - close(finishChan) wait.Wait() + close(finishChan) writeStats.printStats() } func bench_read() { fileIdLineChan := make(chan string) finishChan := make(chan bool) - readStats = newStats() - wait.Add(*b.concurrency) + readStats = newStats(*b.concurrency) go readFileIds(*b.idListFile, fileIdLineChan) readStats.start = time.Now() readStats.total = *b.numberOfFiles go readStats.checkProgress("Randomly Reading Benchmark", finishChan) for i := 0; i < *b.concurrency; i++ { - go readFiles(fileIdLineChan, readStats) + wait.Add(1) + go readFiles(fileIdLineChan, &readStats.localStats[i]) } wait.Wait() + wait.Add(1) finishChan <- true + wait.Wait() close(finishChan) readStats.end = time.Now() readStats.printStats() @@ -170,126 +177,102 @@ type delayedFile struct { fp *operation.FilePart } -func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { +func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { + defer wait.Done() delayedDeleteChan := make(chan *delayedFile, 100) var waitForDeletions sync.WaitGroup for i := 0; i < 7; i++ { + waitForDeletions.Add(1) go func() { - waitForDeletions.Add(1) + defer waitForDeletions.Done() for df := range delayedDeleteChan { - if df == nil { - break - } if df.enterTime.After(time.Now()) { time.Sleep(df.enterTime.Sub(time.Now())) } - fp := df.fp - serverLimitChan[fp.Server] <- true - if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil { + if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil { s.completed++ } else { s.failed++ } - <-serverLimitChan[fp.Server] } - waitForDeletions.Done() }() } - for { - if id, ok := <-idChan; ok { - start := time.Now() - fileSize := int64(*b.fileSize + rand.Intn(64)) - fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} - if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { - fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection - if _, ok := serverLimitChan[fp.Server]; !ok { - serverLimitChan[fp.Server] = make(chan bool, 7) - } - serverLimitChan[fp.Server] <- true - if _, err := fp.Upload(0, *b.server); err == nil { - if rand.Intn(100) < *b.deletePercentage { - s.total++ - delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} - } else { - fileIdLineChan <- fp.Fid - } - s.completed++ - s.transferred += fileSize + for id := range idChan { + start := time.Now() + fileSize := int64(*b.fileSize + rand.Intn(64)) + fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} + if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil { + fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection + if _, err := fp.Upload(0, *b.server); err == nil { + if rand.Intn(100) < *b.deletePercentage { + s.total++ + delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} } else { - s.failed++ - } - writeStats.addSample(time.Now().Sub(start)) - <-serverLimitChan[fp.Server] - if *cmdBenchmark.IsDebug { - fmt.Printf("writing %d file %s\n", id, fp.Fid) + fileIdLineChan <- fp.Fid } + s.completed++ + s.transferred += fileSize } else { s.failed++ - println("writing file error:", err.Error()) + fmt.Printf("Failed to write with error:%v\n", err) + } + writeStats.addSample(time.Now().Sub(start)) + if *cmdBenchmark.IsDebug { + fmt.Printf("writing %d file %s\n", id, fp.Fid) } } else { - break + s.failed++ + println("writing file error:", err.Error()) } } close(delayedDeleteChan) waitForDeletions.Wait() - wait.Done() } -func readFiles(fileIdLineChan chan string, s *stats) { - serverLimitChan := make(map[string]chan bool) +func readFiles(fileIdLineChan chan string, s *stat) { + defer wait.Done() masterLimitChan := make(chan bool, 1) - for { - if fid, ok := <-fileIdLineChan; ok { - if len(fid) == 0 { - continue - } - if fid[0] == '#' { - continue - } - if *cmdBenchmark.IsDebug { - fmt.Printf("reading file %s\n", fid) - } - parts := strings.SplitN(fid, ",", 2) - vid := parts[0] - start := time.Now() - if server, ok := b.vid2server[vid]; !ok { - masterLimitChan <- true - if _, now_ok := b.vid2server[vid]; !now_ok { - if ret, err := operation.Lookup(*b.server, vid); err == nil { - if len(ret.Locations) > 0 { - server = ret.Locations[0].PublicUrl - b.vid2server[vid] = server - } + for fid := range fileIdLineChan { + if len(fid) == 0 { + continue + } + if fid[0] == '#' { + continue + } + if *cmdBenchmark.IsDebug { + fmt.Printf("reading file %s\n", fid) + } + parts := strings.SplitN(fid, ",", 2) + vid := parts[0] + start := time.Now() + if server, ok := b.vid2server[vid]; !ok { + masterLimitChan <- true + if _, now_ok := b.vid2server[vid]; !now_ok { + if ret, err := operation.Lookup(*b.server, vid); err == nil { + if len(ret.Locations) > 0 { + server = ret.Locations[0].PublicUrl + b.vid2server[vid] = server } } - <-masterLimitChan } - if server, ok := b.vid2server[vid]; ok { - if _, ok := serverLimitChan[server]; !ok { - serverLimitChan[server] = make(chan bool, 7) - } - serverLimitChan[server] <- true - url := "http://" + server + "/" + fid - if bytesRead, err := util.Get(url); err == nil { - s.completed++ - s.transferred += int64(len(bytesRead)) - readStats.addSample(time.Now().Sub(start)) - } else { - s.failed++ - println("!!!! Failed to read from ", url, " !!!!!") - } - <-serverLimitChan[server] + <-masterLimitChan + } + if server, ok := b.vid2server[vid]; ok { + url := "http://" + server + "/" + fid + if bytesRead, err := util.Get(url); err == nil { + s.completed++ + s.transferred += int64(len(bytesRead)) + readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - println("!!!! volume id ", vid, " location not found!!!!!") + fmt.Printf("Failed to read %s error:%v\n", url, err) } } else { - break + s.failed++ + println("!!!! volume id ", vid, " location not found!!!!!") } } - wait.Done() } func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { @@ -353,20 +336,28 @@ const ( // An efficient statics collecting and rendering type stats struct { - data []int - overflow []int + data []int + overflow []int + localStats []stat + start time.Time + end time.Time + total int +} +type stat struct { completed int failed int total int transferred int64 - start time.Time - end time.Time } var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100} -func newStats() *stats { - return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)} +func newStats(n int) *stats { + return &stats{ + data: make([]int, benchResolution), + overflow: make([]int, 0), + localStats: make([]stat, n), + } } func (s *stats) addSample(d time.Duration) { @@ -387,28 +378,41 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) { for { select { case <-finishChan: + wait.Done() return case t := <-ticker: - completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime) + completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + transferred += localStat.transferred + total += localStat.total + } fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n", - s.completed, s.total, float64(s.completed)*100/float64(s.total), - float64(completed)*float64(int64(time.Second))/float64(int64(taken)), - float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), + completed, total, float64(completed)*100/float64(total), + float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)), + float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024), ) - lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t + lastCompleted, lastTransferred, lastTime = completed, transferred, t } } } func (s *stats) printStats() { + completed, failed, transferred, total := 0, 0, int64(0), s.total + for _, localStat := range s.localStats { + completed += localStat.completed + failed += localStat.failed + transferred += localStat.transferred + total += localStat.total + } timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000 fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency) fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken) - fmt.Printf("Complete requests: %d\n", s.completed) - fmt.Printf("Failed requests: %d\n", s.failed) - fmt.Printf("Total transferred: %d bytes\n", s.transferred) - fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken) - fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken) + fmt.Printf("Complete requests: %d\n", completed) + fmt.Printf("Failed requests: %d\n", failed) + fmt.Printf("Total transferred: %d bytes\n", transferred) + fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken) + fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken) n, sum := 0, 0 min, max := 10000000, 0 for i := 0; i < len(s.data); i++ { @@ -496,15 +500,32 @@ func (l *FakeReader) Read(p []byte) (n int, err error) { } else { n = len(p) } - for i := 0; i < n-8; i += 8 { - for s := uint(0); s < 8; s++ { - p[i] = byte(l.id >> (s * 8)) + if n >= 8 { + for i := 0; i < 8; i++ { + p[i] = byte(l.id >> uint(i*8)) } } l.size -= int64(n) return } +func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) { + size := int(l.size) + bufferSize := len(sharedBytes) + for size > 0 { + tempBuffer := sharedBytes + if size < bufferSize { + tempBuffer = sharedBytes[0:size] + } + count, e := w.Write(tempBuffer) + if e != nil { + return int64(size), e + } + size -= count + } + return l.size, nil +} + func Readln(r *bufio.Reader) ([]byte, error) { var ( isPrefix bool = true |
