aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/benchmark.go25
-rw-r--r--weed/wdclient/volume_tcp_client.go97
2 files changed, 121 insertions, 1 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index c1bc80c42..af0793c70 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -41,6 +41,7 @@ type BenchmarkOptions struct {
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
fsync *bool
+ useTcp *bool
}
var (
@@ -67,6 +68,7 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
sharedBytes = make([]byte, 1024)
}
@@ -223,6 +225,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
+ volumeTcpClient := wdclient.NewVolumeTcpClient()
+
for id := range idChan {
start := time.Now()
fileSize := int64(*b.fileSize + random.Intn(64))
@@ -243,7 +247,15 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
- if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
+ if *b.useTcp {
+ if uploadByTcp(volumeTcpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
+ } else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
@@ -329,6 +341,17 @@ func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan b
}
}
+func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePart) bool {
+
+ err := volumeTcpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go
new file mode 100644
index 000000000..afebd71eb
--- /dev/null
+++ b/weed/wdclient/volume_tcp_client.go
@@ -0,0 +1,97 @@
+package wdclient
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/net2"
+ "io"
+ "net"
+ "time"
+)
+
+// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
+type VolumeTcpClient struct {
+ cp net2.ConnectionPool
+}
+
+type VolumeTcpConn struct {
+ net.Conn
+ bufWriter *bufio.Writer
+ bufReader *bufio.Reader
+}
+
+func NewVolumeTcpClient() *VolumeTcpClient {
+ MaxIdleTime := 10 * time.Second
+ return &VolumeTcpClient{
+ cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
+ MaxActiveConnections: 16,
+ MaxIdleConnections: 1,
+ MaxIdleTime: &MaxIdleTime,
+ DialMaxConcurrency: 0,
+ Dial: func(network string, address string) (net.Conn, error) {
+ conn, err := net.Dial(network, address)
+ return &VolumeTcpConn{
+ conn,
+ bufio.NewWriter(conn),
+ bufio.NewReader(conn),
+ }, err
+ },
+ NowFunc: nil,
+ ReadTimeout: 0,
+ WriteTimeout: 0,
+ }),
+ }
+}
+func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
+
+ tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000)
+ if parseErr != nil {
+ return parseErr
+ }
+
+ c.cp.Register("tcp", tcpAddress)
+ tcpConn, getErr := c.cp.Get("tcp", tcpAddress)
+ if getErr != nil {
+ return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr)
+ }
+ conn := tcpConn.RawConn().(*VolumeTcpConn)
+ defer func() {
+ if err != nil {
+ tcpConn.DiscardConnection()
+ } else {
+ tcpConn.ReleaseConnection()
+ }
+ }()
+
+ buf := []byte("+" + fileId + "\n")
+ _, err = conn.bufWriter.Write([]byte(buf))
+ if err != nil {
+ return
+ }
+ util.Uint32toBytes(buf[0:4], fileSize)
+ _, err = conn.bufWriter.Write(buf[0:4])
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(conn.bufWriter, fileReader)
+ if err != nil {
+ return
+ }
+ conn.bufWriter.Write([]byte("!\n"))
+ conn.bufWriter.Flush()
+
+ ret, _, err := conn.bufReader.ReadLine()
+ if err != nil {
+ glog.V(0).Infof("upload by tcp: %v", err)
+ return
+ }
+ if !bytes.HasPrefix(ret, []byte("+OK")) {
+ glog.V(0).Infof("upload by tcp: %v", string(ret))
+ }
+
+ return nil
+}