diff options
| author | Konstantin Lebedev <lebedev_k@tochka.com> | 2021-04-06 13:50:33 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <lebedev_k@tochka.com> | 2021-04-06 13:50:33 +0500 |
| commit | 011e6e90ee8a3aeff6f845fec90331ad4714b514 (patch) | |
| tree | b661a90a1cc8c77b2085f120420b0bdd537bcf0d /weed/command/benchmark.go | |
| parent | ed79baa30fe5687a35a9a61e2dcf3b4750064d36 (diff) | |
| parent | 100ed773870b8826352f25e0cd72f60a591ecfa8 (diff) | |
| download | seaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.tar.xz seaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.zip | |
Merge branch 'upstreamMaster' into iamapipr
Diffstat (limited to 'weed/command/benchmark.go')
| -rw-r--r-- | weed/command/benchmark.go | 27 |
1 files changed, 25 insertions, 2 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index c1bc80c42..4fedb55f1 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -41,6 +41,7 @@ type BenchmarkOptions struct { grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient fsync *bool + useTcp *bool } var ( @@ -67,6 +68,7 @@ func init() { b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write") + b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp") sharedBytes = make([]byte, 1024) } @@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { random := rand.New(rand.NewSource(time.Now().UnixNano())) + volumeTcpClient := wdclient.NewVolumeTcpClient() + for id := range idChan { start := time.Now() fileSize := int64(*b.fileSize + random.Intn(64)) @@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) { if !isSecure && assignResult.Auth != "" { isSecure = true } - if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { + if *b.useTcp { + if uploadByTcp(volumeTcpClient, fp) { + fileIdLineChan <- fp.Fid + s.completed++ + s.transferred += fileSize + } else { + s.failed++ + } + } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil { if random.Intn(100) < *b.deletePercentage { s.total++ delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp} @@ -293,7 +305,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, _, err = util.FastGet(url) + bytes, _, err = util.Get(url) if err == nil { break } @@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b } } +func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool { + + err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader) + if err != nil { + glog.Errorf("upload chunk err: %v", err) + return false + } + + return true +} + func readFileIds(fileName string, fileIdLineChan chan string) { file, err := os.Open(fileName) // For read access. if err != nil { |
