diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/command/benchmark.go | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/command/benchmark.go')
| -rw-r--r-- | weed/command/benchmark.go | 74 |
1 files changed, 60 insertions, 14 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 382e7c850..de44fac75 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -19,6 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/wdclient" @@ -40,6 +41,8 @@ type BenchmarkOptions struct { maxCpu *int grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient + grpcRead *bool + fsync *bool } var ( @@ -64,6 +67,8 @@ func init() { b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read") + b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") sharedBytes = make([]byte, 1024) } @@ -110,7 +115,7 @@ func runBenchmark(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH) + fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) if *b.maxCpu < 1 { *b.maxCpu = runtime.NumCPU() } @@ -124,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ",")) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, strings.Split(*b.masters, ",")) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() @@ -224,9 +229,10 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) fp := &operation.FilePart{ - Reader: &FakeReader{id: uint64(id), size: fileSize}, + Reader: &FakeReader{id: uint64(id), size: fileSize, random: random}, FileSize: fileSize, MimeType: "image/bench", // prevent gzip benchmark content + Fsync: *b.fsync, } ar := &operation.VolumeAssignRequest{ Count: 1, @@ -238,7 +244,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil { + if _, err := fp.Upload(0, b.masterClient.GetMaster(), false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -278,23 +284,61 @@ func readFiles(fileIdLineChan chan string, s *stat) { fmt.Printf("reading file %s\n", fid) } start := time.Now() - url, err := b.masterClient.LookupFileId(fid) - if err != nil { - s.failed++ - println("!!!! ", fid, " location not found!!!!!") - continue + var bytesRead int + var err error + if *b.grpcRead { + volumeServer, err := b.masterClient.LookupVolumeServer(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption) + } else { + url, err := b.masterClient.LookupFileId(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + var bytes []byte + bytes, err = util.Get(url) + bytesRead = len(bytes) } - if bytesRead, err := util.Get(url); err == nil { + if err == nil { s.completed++ - s.transferred += int64(len(bytesRead)) + s.transferred += int64(bytesRead) readStats.addSample(time.Now().Sub(start)) } else { s.failed++ - fmt.Printf("Failed to read %s error:%v\n", url, err) + fmt.Printf("Failed to read %s error:%v\n", fid, err) } } } +func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) { + err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid}) + if err != nil { + return err + } + + for { + resp, respErr := fileGetClient.Recv() + if resp != nil { + bytesRead += len(resp.Data) + } + if respErr != nil { + if respErr == io.EOF { + return nil + } + return respErr + } + } + }) + return +} + func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { @@ -509,8 +553,9 @@ func (s *stats) printStats() { // a fake reader to generate content to upload type FakeReader struct { - id uint64 // an id number - size int64 // max bytes + id uint64 // an id number + size int64 // max bytes + random *rand.Rand } func (l *FakeReader) Read(p []byte) (n int, err error) { @@ -526,6 +571,7 @@ func (l *FakeReader) Read(p []byte) (n int, err error) { for i := 0; i < 8; i++ { p[i] = byte(l.id >> uint(i*8)) } + l.random.Read(p[8:]) } l.size -= int64(n) return |
