aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-06 14:26:24 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-06 14:26:27 -0800
commit1bd880dcdb870b3a5939c2a8eda7b9be483d05af (patch)
treee596fd4485cb0133cf727ec0a5bc237e4575d295 /weed/command
parent9d402ebe9f4913899ff14f904944a376dbe99439 (diff)
downloadseaweedfs-1bd880dcdb870b3a5939c2a8eda7b9be483d05af.tar.xz
seaweedfs-1bd880dcdb870b3a5939c2a8eda7b9be483d05af.zip
adds tcp writes benchmark
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go25
1 files changed, 24 insertions, 1 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index c1bc80c42..af0793c70 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
fsync *bool
+ useTcp *bool
}
var (
@@ -67,6 +68,7 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ volumeTcpClient := wdclient.NewVolumeTcpClient()
+
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
@@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
+ if *b.useTcp {
+ if uploadByTcp(volumeTcpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
+ } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
+func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
+
+ err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {