aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/volume_tcp_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/volume_tcp_client.go')
-rw-r--r--weed/wdclient/volume_tcp_client.go97
1 files changed, 0 insertions, 97 deletions
diff --git a/weed/wdclient/volume_tcp_client.go b/weed/wdclient/volume_tcp_client.go
deleted file mode 100644
index d7ea81d64..000000000
--- a/weed/wdclient/volume_tcp_client.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package wdclient
-
-import (
- "bufio"
- "bytes"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/wdclient/net2"
- "io"
- "net"
- "time"
-)
-
-// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
-type VolumeTcpClient struct {
- cp net2.ConnectionPool
-}
-
-type VolumeTcpConn struct {
- net.Conn
- bufWriter *bufio.Writer
- bufReader *bufio.Reader
-}
-
-func NewVolumeTcpClient() *VolumeTcpClient {
- MaxIdleTime := 10 * time.Second
- return &VolumeTcpClient{
- cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
- MaxActiveConnections: 16,
- MaxIdleConnections: 1,
- MaxIdleTime: &MaxIdleTime,
- DialMaxConcurrency: 0,
- Dial: func(network string, address string) (net.Conn, error) {
- conn, err := net.Dial(network, address)
- return &VolumeTcpConn{
- conn,
- bufio.NewWriter(conn),
- bufio.NewReader(conn),
- }, err
- },
- NowFunc: nil,
- ReadTimeout: 0,
- WriteTimeout: 0,
- }),
- }
-}
-func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
-
- tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000)
- if parseErr != nil {
- return parseErr
- }
-
- c.cp.Register("tcp", tcpAddress)
- tcpConn, getErr := c.cp.Get("tcp", tcpAddress)
- if getErr != nil {
- return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr)
- }
- conn := tcpConn.RawConn().(*VolumeTcpConn)
- defer func() {
- if err != nil {
- tcpConn.DiscardConnection()
- } else {
- tcpConn.ReleaseConnection()
- }
- }()
-
- buf := []byte("+" + fileId + "\n")
- _, err = conn.bufWriter.Write([]byte(buf))
- if err != nil {
- return
- }
- util.Uint32toBytes(buf[0:4], fileSize)
- _, err = conn.bufWriter.Write(buf[0:4])
- if err != nil {
- return
- }
- _, err = io.Copy(conn.bufWriter, fileReader)
- if err != nil {
- return
- }
- conn.bufWriter.Write([]byte("!\n"))
- conn.bufWriter.Flush()
-
- ret, _, err := conn.bufReader.ReadLine()
- if err != nil {
- glog.V(0).Infof("upload by tcp: %v", err)
- return
- }
- if !bytes.HasPrefix(ret, []byte("+OK")) {
- glog.V(0).Infof("upload by tcp: %v", string(ret))
- }
-
- return nil
-}