diff options
Diffstat (limited to 'weed/command/benchmark.go')
| -rw-r--r-- | weed/command/benchmark.go | 121 |
1 files changed, 90 insertions, 31 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 8b65c8663..e85ab1b9b 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -15,8 +15,11 @@ import ( "sync" "time" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" @@ -33,19 +36,22 @@ type BenchmarkOptions struct { read *bool sequentialRead *bool collection *string + replication *string cpuprofile *string maxCpu *int - secretKey *string + grpcDialOption grpc.DialOption + masterClient *wdclient.MasterClient + grpcRead *bool } var ( - b BenchmarkOptions - sharedBytes []byte - masterClient *wdclient.MasterClient + b BenchmarkOptions + sharedBytes []byte + isSecure bool ) func init() { - cmdBenchmark.Run = runbenchmark // break init cycle + cmdBenchmark.Run = runBenchmark // break init cycle cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information") b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location") b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes") @@ -57,14 +63,15 @@ func init() { b.read = cmdBenchmark.Flag.Bool("read", true, "enable read") b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file") b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection") + b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") + b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read") sharedBytes = make([]byte, 1024) } var cmdBenchmark = &Command{ - UsageLine: "benchmark -server=localhost:9333 -c=10 -n=100000", + UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000", Short: "benchmark on writing millions of files and read out", Long: `benchmark on an empty SeaweedFS file system. @@ -101,7 +108,11 @@ var ( readStats *stats ) -func runbenchmark(cmd *Command, args []string) bool { +func runBenchmark(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { *b.maxCpu = runtime.NumCPU() @@ -116,22 +127,22 @@ func runbenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - masterClient = wdclient.NewMasterClient(context.Background(), "benchmark", strings.Split(*b.masters, ",")) - go masterClient.KeepConnectedToMaster() - masterClient.WaitUntilConnected() + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", 0, strings.Split(*b.masters, ",")) + go b.masterClient.KeepConnectedToMaster() + b.masterClient.WaitUntilConnected() if *b.write { - bench_write() + benchWrite() } if *b.read { - bench_read() + benchRead() } return true } -func bench_write() { +func benchWrite() { fileIdLineChan := make(chan string) finishChan := make(chan bool) writeStats = newStats(*b.concurrency) @@ -158,7 +169,7 @@ func bench_write() { writeStats.printStats() } -func bench_read() { +func benchRead() { fileIdLineChan := make(chan string) finishChan := make(chan bool) readStats = newStats(*b.concurrency) @@ -188,7 +199,6 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { defer wait.Done() delayedDeleteChan := make(chan *delayedFile, 100) var waitForDeletions sync.WaitGroup - secret := security.Secret(*b.secretKey) for i := 0; i < 7; i++ { waitForDeletions.Add(1) @@ -198,8 +208,11 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if df.enterTime.After(time.Now()) { time.Sleep(df.enterTime.Sub(time.Now())) } - if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid, - security.GenJwt(secret, df.fp.Fid)); e == nil { + var jwtAuthorization security.EncodedJwt + if isSecure { + jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid) + } + if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil { s.completed++ } else { s.failed++ @@ -213,14 +226,22 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) - fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize} + fp := &operation.FilePart{ + Reader: &FakeReader{id: uint64(id), size: fileSize}, + FileSize: fileSize, + MimeType: "image/bench", // prevent gzip benchmark content + } ar := &operation.VolumeAssignRequest{ - Count: 1, - Collection: *b.collection, + Count: 1, + Collection: *b.collection, + Replication: *b.replication, } - if assignResult, err := operation.Assign(masterClient.GetMaster(), ar); err == nil { + if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil { fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection - if _, err := fp.Upload(0, masterClient.GetMaster(), secret); err == nil { + if !isSecure && assignResult.Auth != "" { + isSecure = true + } + if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -260,23 +281,61 @@ func readFiles(fileIdLineChan chan string, s *stat) { fmt.Printf("reading file %s\n", fid) } start := time.Now() - url, err := masterClient.LookupFileId(fid) - if err != nil { - s.failed++ - println("!!!! ", fid, " location not found!!!!!") - continue + var bytesRead int + var err error + if *b.grpcRead { + volumeServer, err := b.masterClient.LookupVolumeServer(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption) + } else { + url, err := b.masterClient.LookupFileId(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + var bytes []byte + bytes, err = util.Get(url) + bytesRead = len(bytes) } - if bytesRead, err := util.Get(url); err == nil { + if err == nil { s.completed++ - s.transferred += int64(len(bytesRead)) + s.transferred += int64(bytesRead) readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - fmt.Printf("Failed to read %s error:%v\n", url, err) + fmt.Printf("Failed to read %s error:%v\n", fid, err) } } } +func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) { + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid}) + if err != nil { + return err + } + + for { + resp, respErr := fileGetClient.Recv() + if resp != nil { + bytesRead += len(resp.Data) + } + if respErr != nil { + if respErr == io.EOF { + return nil + } + return respErr + } + } + }) + return +} + func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { |
