aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/weed/benchmark.go150
1 files changed, 62 insertions, 88 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index 47b400fcd..f4f0b1874 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -93,16 +93,11 @@ var cmdBenchmark = &Command{
}
var (
- wait sync.WaitGroup
- writeStats *stats
- readStats *stats
- serverLimitChan map[string]chan bool
+ wait sync.WaitGroup
+ writeStats *stats
+ readStats *stats
)
-func init() {
- serverLimitChan = make(map[string]chan bool)
-}
-
func runbenchmark(cmd *Command, args []string) bool {
fmt.Printf("This is Seaweed File System version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
@@ -148,10 +143,11 @@ func bench_write() {
close(idChan)
wait.Wait()
writeStats.end = time.Now()
- wait.Add(1)
+ wait.Add(2)
+ finishChan <- true
finishChan <- true
- close(finishChan)
wait.Wait()
+ close(finishChan)
writeStats.printStats()
}
@@ -168,7 +164,9 @@ func bench_read() {
go readFiles(fileIdLineChan, &readStats.localStats[i])
}
wait.Wait()
+ wait.Add(1)
finishChan <- true
+ wait.Wait()
close(finishChan)
readStats.end = time.Now()
readStats.printStats()
@@ -186,61 +184,46 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
for i := 0; i < 7; i++ {
waitForDeletions.Add(1)
go func() {
+ defer waitForDeletions.Done()
for df := range delayedDeleteChan {
- if df == nil {
- break
- }
if df.enterTime.After(time.Now()) {
time.Sleep(df.enterTime.Sub(time.Now()))
}
- fp := df.fp
- serverLimitChan[fp.Server] <- true
- if e := util.Delete("http://" + fp.Server + "/" + fp.Fid); e == nil {
+ if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
s.completed++
} else {
s.failed++
}
- <-serverLimitChan[fp.Server]
}
- waitForDeletions.Done()
}()
}
- for {
- if id, ok := <-idChan; ok {
- start := time.Now()
- fileSize := int64(*b.fileSize + rand.Intn(64))
- fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
- if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
- fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
- if _, ok := serverLimitChan[fp.Server]; !ok {
- serverLimitChan[fp.Server] = make(chan bool, 7)
- }
- serverLimitChan[fp.Server] <- true
- if _, err := fp.Upload(0, *b.server); err == nil {
- if rand.Intn(100) < *b.deletePercentage {
- s.total++
- delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
- } else {
- fileIdLineChan <- fp.Fid
- }
- s.completed++
- s.transferred += fileSize
+ for id := range idChan {
+ start := time.Now()
+ fileSize := int64(*b.fileSize + rand.Intn(64))
+ fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
+ if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
+ fp.Server, fp.Fid, fp.Collection = assignResult.PublicUrl, assignResult.Fid, *b.collection
+ if _, err := fp.Upload(0, *b.server); err == nil {
+ if rand.Intn(100) < *b.deletePercentage {
+ s.total++
+ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
} else {
- s.failed++
- fmt.Printf("Failed to write with error:%v\n", err)
- }
- writeStats.addSample(time.Now().Sub(start))
- <-serverLimitChan[fp.Server]
- if *cmdBenchmark.IsDebug {
- fmt.Printf("writing %d file %s\n", id, fp.Fid)
+ fileIdLineChan <- fp.Fid
}
+ s.completed++
+ s.transferred += fileSize
} else {
s.failed++
- println("writing file error:", err.Error())
+ fmt.Printf("Failed to write with error:%v\n", err)
+ }
+ writeStats.addSample(time.Now().Sub(start))
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("writing %d file %s\n", id, fp.Fid)
}
} else {
- break
+ s.failed++
+ println("writing file error:", err.Error())
}
}
close(delayedDeleteChan)
@@ -249,55 +232,45 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
func readFiles(fileIdLineChan chan string, s *stat) {
defer wait.Done()
- serverLimitChan := make(map[string]chan bool)
masterLimitChan := make(chan bool, 1)
- for {
- if fid, ok := <-fileIdLineChan; ok {
- if len(fid) == 0 {
- continue
- }
- if fid[0] == '#' {
- continue
- }
- if *cmdBenchmark.IsDebug {
- fmt.Printf("reading file %s\n", fid)
- }
- parts := strings.SplitN(fid, ",", 2)
- vid := parts[0]
- start := time.Now()
- if server, ok := b.vid2server[vid]; !ok {
- masterLimitChan <- true
- if _, now_ok := b.vid2server[vid]; !now_ok {
- if ret, err := operation.Lookup(*b.server, vid); err == nil {
- if len(ret.Locations) > 0 {
- server = ret.Locations[0].PublicUrl
- b.vid2server[vid] = server
- }
+ for fid := range fileIdLineChan {
+ if len(fid) == 0 {
+ continue
+ }
+ if fid[0] == '#' {
+ continue
+ }
+ if *cmdBenchmark.IsDebug {
+ fmt.Printf("reading file %s\n", fid)
+ }
+ parts := strings.SplitN(fid, ",", 2)
+ vid := parts[0]
+ start := time.Now()
+ if server, ok := b.vid2server[vid]; !ok {
+ masterLimitChan <- true
+ if _, now_ok := b.vid2server[vid]; !now_ok {
+ if ret, err := operation.Lookup(*b.server, vid); err == nil {
+ if len(ret.Locations) > 0 {
+ server = ret.Locations[0].PublicUrl
+ b.vid2server[vid] = server
}
}
- <-masterLimitChan
}
- if server, ok := b.vid2server[vid]; ok {
- if _, ok := serverLimitChan[server]; !ok {
- serverLimitChan[server] = make(chan bool, 7)
- }
- serverLimitChan[server] <- true
- url := "http://" + server + "/" + fid
- if bytesRead, err := util.Get(url); err == nil {
- s.completed++
- s.transferred += int64(len(bytesRead))
- readStats.addSample(time.Now().Sub(start))
- } else {
- s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
- }
- <-serverLimitChan[server]
+ <-masterLimitChan
+ }
+ if server, ok := b.vid2server[vid]; ok {
+ url := "http://" + server + "/" + fid
+ if bytesRead, err := util.Get(url); err == nil {
+ s.completed++
+ s.transferred += int64(len(bytesRead))
+ readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- println("!!!! volume id ", vid, " location not found!!!!!")
+ fmt.Printf("Failed to read %s error:%v\n", url, err)
}
} else {
- break
+ s.failed++
+ println("!!!! volume id ", vid, " location not found!!!!!")
}
}
}
@@ -405,6 +378,7 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
for {
select {
case <-finishChan:
+ wait.Done()
return
case t := <-ticker:
completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total