aboutsummaryrefslogtreecommitdiff
path: root/weed/command/benchmark.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/benchmark.go')
-rw-r--r--weed/command/benchmark.go62
1 files changed, 51 insertions, 11 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 26be1fe3a..e85ab1b9b 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -15,11 +15,11 @@ import (
"sync"
"time"
- "github.com/spf13/viper"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
+ grpcRead *bool
}
var (
@@ -65,6 +66,7 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
sharedBytes = make([]byte, 1024)
}
@@ -109,7 +111,7 @@ var (
func runBenchmark(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- b.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
+ b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
if *b.maxCpu < 1 {
@@ -125,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", 0, strings.Split(*b.masters, ","))
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
@@ -279,23 +281,61 @@ func readFiles(fileIdLineChan chan string, s *stat) {
fmt.Printf("reading file %s\n", fid)
}
start := time.Now()
- url, err := b.masterClient.LookupFileId(fid)
- if err != nil {
- s.failed++
- println("!!!! ", fid, " location not found!!!!!")
- continue
+ var bytesRead int
+ var err error
+ if *b.grpcRead {
+ volumeServer, err := b.masterClient.LookupVolumeServer(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
+ } else {
+ url, err := b.masterClient.LookupFileId(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ var bytes []byte
+ bytes, err = util.Get(url)
+ bytesRead = len(bytes)
}
- if bytesRead, err := util.Get(url); err == nil {
+ if err == nil {
s.completed++
- s.transferred += int64(len(bytesRead))
+ s.transferred += int64(bytesRead)
readStats.addSample(time.Now().Sub(start))
} else {
s.failed++
- fmt.Printf("Failed to read %s error:%v\n", url, err)
+ fmt.Printf("Failed to read %s error:%v\n", fid, err)
}
}
}
+func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
+ err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ fileGetClient, err := client.FileGet(context.Background(), &volume_server_pb.FileGetRequest{FileId: fid})
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, respErr := fileGetClient.Recv()
+ if resp != nil {
+ bytesRead += len(resp.Data)
+ }
+ if respErr != nil {
+ if respErr == io.EOF {
+ return nil
+ }
+ return respErr
+ }
+ }
+ })
+ return
+}
+
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {