diff options
| author | Chris Lu <chris.lu@gmail.com> | 2014-03-12 10:30:57 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2014-03-12 10:30:57 -0700 |
| commit | 054374c765bc0dea51c524bf8cb137d1ea356254 (patch) | |
| tree | 50a83ea0ecd1ef1673d2b6c40cfb4b105bcba97c /go/weed/benchmark.go | |
| parent | 466a55b06d660c85673a0e9d1a8802e9f553f269 (diff) | |
| download | seaweedfs-054374c765bc0dea51c524bf8cb137d1ea356254.tar.xz seaweedfs-054374c765bc0dea51c524bf8cb137d1ea356254.zip | |
in progress, trying to make benchmark working better to reuse http
connections.
Diffstat (limited to 'go/weed/benchmark.go')
| -rw-r--r-- | go/weed/benchmark.go | 116 |
1 files changed, 79 insertions, 37 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go index 99faf15b0..d7d80729d 100644 --- a/go/weed/benchmark.go +++ b/go/weed/benchmark.go @@ -11,6 +11,7 @@ import ( "math/rand" "os" "runtime" + "runtime/pprof" "strings" "sync" "time" @@ -26,6 +27,7 @@ type BenchmarkOptions struct { read *bool sequentialRead *bool collection *string + cpuprofile *string vid2server map[string]string //cache for vid locations } @@ -45,6 +47,8 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") + b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "write cpu profile to file") + b.vid2server = make(map[string]string) } var cmdBenchmark = &Command{ @@ -80,62 +84,86 @@ var ( ) func runbenchmark(cmd *Command, args []string) bool { - finishChan := make(chan bool) - fileIdLineChan := make(chan string) - b.vid2server = make(map[string]string) - fmt.Printf("This is Weed File System version %s %s %s\n", VERSION, runtime.GOOS, runtime.GOARCH) + if *b.cpuprofile != "" { + f, err := os.Create(*b.cpuprofile) + if err != nil { + glog.Fatal(err) + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } if *b.write { - writeStats = newStats() - idChan := make(chan int) - wait.Add(*b.concurrency) - go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) - for i := 0; i < *b.concurrency; i++ { - go writeFiles(idChan, fileIdLineChan, writeStats) - } - writeStats.start = time.Now() - go writeStats.checkProgress("Writing Benchmark", finishChan) - for i := 0; i < *b.numberOfFiles; i++ { - idChan <- i - } - close(idChan) - wait.Wait() - writeStats.end = time.Now() - wait.Add(1) - finishChan <- true - finishChan <- true - wait.Wait() - writeStats.printStats() + bench_write() } if *b.read { - readStats = newStats() - wait.Add(*b.concurrency) - go readFileIds(*b.idListFile, fileIdLineChan) - readStats.start = time.Now() - go readStats.checkProgress("Randomly Reading Benchmark", finishChan) - for i := 0; i < *b.concurrency; i++ { - go readFiles(fileIdLineChan, readStats) - } - wait.Wait() - finishChan <- true - readStats.end = time.Now() - readStats.printStats() + bench_read() } return true } +func bench_write() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + writeStats = newStats() + idChan := make(chan int) + wait.Add(*b.concurrency) + go writeFileIds(*b.idListFile, fileIdLineChan, finishChan) + for i := 0; i < *b.concurrency; i++ { + go writeFiles(idChan, fileIdLineChan, writeStats) + } + writeStats.start = time.Now() + go writeStats.checkProgress("Writing Benchmark", finishChan) + for i := 0; i < *b.numberOfFiles; i++ { + idChan <- i + } + close(idChan) + wait.Wait() + writeStats.end = time.Now() + wait.Add(1) + finishChan <- true + finishChan <- true + close(finishChan) + wait.Wait() + writeStats.printStats() +} + +func bench_read() { + fileIdLineChan := make(chan string) + finishChan := make(chan bool) + readStats = newStats() + wait.Add(*b.concurrency) + go readFileIds(*b.idListFile, fileIdLineChan) + readStats.start = time.Now() + go readStats.checkProgress("Randomly Reading Benchmark", finishChan) + for i := 0; i < *b.concurrency; i++ { + go readFiles(fileIdLineChan, readStats) + } + wait.Wait() + finishChan <- true + close(finishChan) + readStats.end = time.Now() + readStats.printStats() +} + func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { + serverLimitChan := make(map[string]chan bool) for { if id, ok := <-idChan; ok { start := time.Now() fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: int64(*b.fileSize)}, FileSize: int64(*b.fileSize)} if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection + if _, ok := serverLimitChan[fp.Server]; !ok { + serverLimitChan[fp.Server] = make(chan bool, 7) + } + serverLimitChan[fp.Server] <- true fp.Upload(0, *b.server) writeStats.addSample(time.Now().Sub(start)) + <-serverLimitChan[fp.Server] fileIdLineChan <- fp.Fid s.transferred += int64(*b.fileSize) s.completed++ @@ -154,6 +182,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) { } func readFiles(fileIdLineChan chan string, s *stats) { + serverLimitChan := make(map[string]chan bool) + masterLimitChan := make(chan bool, 7) for { if fid, ok := <-fileIdLineChan; ok { if len(fid) == 0 { @@ -169,14 +199,20 @@ func readFiles(fileIdLineChan chan string, s *stats) { vid := parts[0] start := time.Now() if server, ok := b.vid2server[vid]; !ok { + masterLimitChan <- true if ret, err := operation.Lookup(*b.server, vid); err == nil { if len(ret.Locations) > 0 { server = ret.Locations[0].PublicUrl b.vid2server[vid] = server } } + <-masterLimitChan } if server, ok := b.vid2server[vid]; ok { + if _, ok := serverLimitChan[server]; !ok { + serverLimitChan[server] = make(chan bool, 7) + } + serverLimitChan[server] <- true url := "http://" + server + "/" + fid if bytesRead, err := util.Get(url); err == nil { s.completed++ @@ -186,6 +222,7 @@ func readFiles(fileIdLineChan chan string, s *stats) { s.failed++ println("!!!! Failed to read from ", url, " !!!!!") } + <-serverLimitChan[server] } else { s.failed++ println("!!!! volume id ", vid, " location not found!!!!!") @@ -270,7 +307,12 @@ func newStats() *stats { } func (s *stats) addSample(d time.Duration) { - s.data[int(d/benchBucket)]++ + index := int(d / benchBucket) + if 0 <= index && index < len(s.data) { + s.data[int(d/benchBucket)]++ + } else { + fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000) + } } func (s *stats) checkProgress(testName string, finishChan chan bool) { |
