aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorchrislusf <chrislu@Chriss-MacBook-Air.local>2014-12-04 18:30:44 -0800
committerchrislusf <chrislu@Chriss-MacBook-Air.local>2014-12-04 18:30:44 -0800
commit89fd1e4b6e586412f0692ac479b566a323296595 (patch)
treea03ff3fd756bc55608222e455a5f727643748cd9 /go
parented7b00bf0207749cb555d7d77bc35ddf5849ec37 (diff)
downloadseaweedfs-89fd1e4b6e586412f0692ac479b566a323296595.tar.xz
seaweedfs-89fd1e4b6e586412f0692ac479b566a323296595.zip
Add more thread safe counters. Tighten thread synchronization.
Diffstat (limited to 'go')
-rw-r--r--go/weed/benchmark.go79
1 files changed, 49 insertions, 30 deletions
diff --git a/go/weed/benchmark.go b/go/weed/benchmark.go
index f88d0d6a9..47b400fcd 100644
--- a/go/weed/benchmark.go
+++ b/go/weed/benchmark.go
@@ -132,12 +132,12 @@ func runbenchmark(cmd *Command, args []string) bool {
func bench_write() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- writeStats = newStats()
+ writeStats = newStats(*b.concurrency)
idChan := make(chan int)
- wait.Add(*b.concurrency)
go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
for i := 0; i < *b.concurrency; i++ {
- go writeFiles(idChan, fileIdLineChan, writeStats)
+ wait.Add(1)
+ go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
}
writeStats.start = time.Now()
writeStats.total = *b.numberOfFiles
@@ -150,7 +150,6 @@ func bench_write() {
writeStats.end = time.Now()
wait.Add(1)
finishChan <- true
- finishChan <- true
close(finishChan)
wait.Wait()
writeStats.printStats()
@@ -159,14 +158,14 @@ func bench_write() {
func bench_read() {
fileIdLineChan := make(chan string)
finishChan := make(chan bool)
- readStats = newStats()
- wait.Add(*b.concurrency)
+ readStats = newStats(*b.concurrency)
go readFileIds(*b.idListFile, fileIdLineChan)
readStats.start = time.Now()
readStats.total = *b.numberOfFiles
go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
for i := 0; i < *b.concurrency; i++ {
- go readFiles(fileIdLineChan, readStats)
+ wait.Add(1)
+ go readFiles(fileIdLineChan, &readStats.localStats[i])
}
wait.Wait()
finishChan <- true
@@ -180,12 +179,13 @@ type delayedFile struct {
fp *operation.FilePart
}
-func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
+func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
delayedDeleteChan := make(chan *delayedFile, 100)
var waitForDeletions sync.WaitGroup
for i := 0; i < 7; i++ {
+ waitForDeletions.Add(1)
go func() {
- waitForDeletions.Add(1)
for df := range delayedDeleteChan {
if df == nil {
break
@@ -228,6 +228,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
s.transferred += fileSize
} else {
s.failed++
+ fmt.Printf("Failed to write with error:%v\n", err)
}
writeStats.addSample(time.Now().Sub(start))
<-serverLimitChan[fp.Server]
@@ -244,10 +245,10 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stats) {
}
close(delayedDeleteChan)
waitForDeletions.Wait()
- wait.Done()
}
-func readFiles(fileIdLineChan chan string, s *stats) {
+func readFiles(fileIdLineChan chan string, s *stat) {
+ defer wait.Done()
serverLimitChan := make(map[string]chan bool)
masterLimitChan := make(chan bool, 1)
for {
@@ -288,7 +289,7 @@ func readFiles(fileIdLineChan chan string, s *stats) {
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- println("!!!! Failed to read from ", url, " !!!!!")
+ fmt.Printf("Failed to read %s error:%v\n", url, err)
}
<-serverLimitChan[server]
} else {
@@ -299,7 +300,6 @@ func readFiles(fileIdLineChan chan string, s *stats) {
break
}
}
- wait.Done()
}
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
@@ -363,20 +363,28 @@ const (
// An efficient statics collecting and rendering
type stats struct {
- data []int
- overflow []int
+ data []int
+ overflow []int
+ localStats []stat
+ start time.Time
+ end time.Time
+ total int
+}
+type stat struct {
completed int
failed int
total int
transferred int64
- start time.Time
- end time.Time
}
var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
-func newStats() *stats {
- return &stats{data: make([]int, benchResolution), overflow: make([]int, 0)}
+func newStats(n int) *stats {
+ return &stats{
+ data: make([]int, benchResolution),
+ overflow: make([]int, 0),
+ localStats: make([]stat, n),
+ }
}
func (s *stats) addSample(d time.Duration) {
@@ -399,26 +407,38 @@ func (s *stats) checkProgress(testName string, finishChan chan bool) {
case <-finishChan:
return
case t := <-ticker:
- completed, transferred, taken := s.completed-lastCompleted, s.transferred-lastTransferred, t.Sub(lastTime)
+ completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
- s.completed, s.total, float64(s.completed)*100/float64(s.total),
- float64(completed)*float64(int64(time.Second))/float64(int64(taken)),
- float64(transferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
+ completed, total, float64(completed)*100/float64(total),
+ float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
+ float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
)
- lastCompleted, lastTransferred, lastTime = s.completed, s.transferred, t
+ lastCompleted, lastTransferred, lastTime = completed, transferred, t
}
}
}
func (s *stats) printStats() {
+ completed, failed, transferred, total := 0, 0, int64(0), s.total
+ for _, localStat := range s.localStats {
+ completed += localStat.completed
+ failed += localStat.failed
+ transferred += localStat.transferred
+ total += localStat.total
+ }
timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
- fmt.Printf("Complete requests: %d\n", s.completed)
- fmt.Printf("Failed requests: %d\n", s.failed)
- fmt.Printf("Total transferred: %d bytes\n", s.transferred)
- fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(s.completed)/timeTaken)
- fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(s.transferred)/1024/timeTaken)
+ fmt.Printf("Complete requests: %d\n", completed)
+ fmt.Printf("Failed requests: %d\n", failed)
+ fmt.Printf("Total transferred: %d bytes\n", transferred)
+ fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
+ fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
n, sum := 0, 0
min, max := 10000000, 0
for i := 0; i < len(s.data); i++ {
@@ -544,4 +564,3 @@ func Readln(r *bufio.Reader) ([]byte, error) {
}
return ln, err
}
-