aboutsummaryrefslogtreecommitdiff
path: root/weed/command/benchmark.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/benchmark.go')
-rw-r--r--weed/command/benchmark.go71
1 files changed, 55 insertions, 16 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 26be1fe3a..4fedb55f1 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -2,7 +2,6 @@ package command
import (
"bufio"
- "context"
"fmt"
"io"
"math"
@@ -15,7 +14,6 @@ import (
"sync"
"time"
- "github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -37,10 +35,13 @@ type BenchmarkOptions struct {
sequentialRead *bool
collection *string
replication *string
+ diskType *string
cpuprofile *string
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
+ fsync *bool
+ useTcp *bool
}
var (
@@ -63,8 +64,11 @@ func init() {
b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
+ b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@@ -109,9 +113,9 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
+ fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
*b.maxCpu = runtime.NumCPU()
}
@@ -125,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
@@ -221,25 +225,37 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ volumeTcpClient := wdclient.NewVolumeTcpClient()
+
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
fp := &operation.FilePart{
- Reader: &FakeReader{id: uint64(id), size: fileSize},
+ Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content
+ Fsync: *b.fsync,
}
ar := &operation.VolumeAssignRequest{
Count: 1,
Collection: *b.collection,
Replication: *b.replication,
+ DiskType: *b.diskType,
}
- if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
+ if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
+ if *b.useTcp {
+ if uploadByTcp(volumeTcpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
+ } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -279,19 +295,29 @@ func readFiles(fileIdLineChan chan string, s *stat) {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
- url, err := b.masterClient.LookupFileId(fid)
+ var bytesRead int
+ var err error
+ urls, err := b.masterClient.LookupFileId(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
- if bytesRead, err := util.Get(url); err == nil {
+ var bytes []byte
+ for _, url := range urls {
+ bytes, _, err = util.Get(url)
+ if err == nil {
+ break
+ }
+ }
+ bytesRead = len(bytes)
+ if err == nil {
s.completed++
- s.transferred += int64(len(bytesRead))
+ s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
+ fmt.Printf("Failed to read %s error:%v\n", fid, err)
}
}
}
@@ -315,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
+func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
+
+ err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
@@ -353,7 +390,7 @@ func readFileIds(fileName string, fileIdLineChan chan string) {
}
const (
- benchResolution = 10000 //0.1 microsecond
+ benchResolution = 10000 // 0.1 microsecond
benchBucket = 1000000000 / benchResolution
)
@@ -476,7 +513,7 @@ func (s *stats) printStats() {
fmt.Printf("\nConnection Times (ms)\n")
fmt.Printf(" min avg max std\n")
fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
- //printing percentiles
+ // printing percentiles
fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
percentiles := make([]int, len(percentages))
for i := 0; i < len(percentages); i++ {
@@ -510,8 +547,9 @@ func (s *stats) printStats() {
// a fake reader to generate content to upload
type FakeReader struct {
- id uint64 // an id number
- size int64 // max bytes
+ id uint64 // an id number
+ size int64 // max bytes
+ random *rand.Rand
}
func (l *FakeReader) Read(p []byte) (n int, err error) {
@@ -527,6 +565,7 @@ func (l *FakeReader) Read(p []byte) (n int, err error) {
for i := 0; i < 8; i++ {
p[i] = byte(l.id >> uint(i*8))
}
+ l.random.Read(p[8:])
}
l.size -= int64(n)
return