diff options
Diffstat (limited to 'weed/wdclient/volume_udp_client.go')
| -rw-r--r-- | weed/wdclient/volume_udp_client.go | 40 |
1 files changed, 16 insertions, 24 deletions
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go index d0365f110..e8a5fa34d 100644 --- a/weed/wdclient/volume_udp_client.go +++ b/weed/wdclient/volume_udp_client.go @@ -3,18 +3,22 @@ package wdclient import ( "bufio" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/udptransfer" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient/penet" "io" "net" + "time" ) // VolumeUdpClient put/get/delete file chunks directly on volume servers without replication type VolumeUdpClient struct { + Conn net.Conn + bufWriter *bufio.Writer + bufReader *bufio.Reader } type VolumeUdpConn struct { - net.Conn + Conn net.Conn bufWriter *bufio.Writer bufReader *bufio.Reader } @@ -30,41 +34,29 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string return parseErr } - listener, err := udptransfer.NewEndpoint(&udptransfer.Params{ - LocalAddr: "", - Bandwidth: 100, - FastRetransmit: true, - FlatTraffic: true, - IsServ: false, - }) - if err != nil { - return err + if c.Conn == nil { + c.Conn, err = penet.DialTimeout("", udpAddress, 500*time.Millisecond) + if err != nil { + return err + } + c.bufWriter = bufio.NewWriter(c.Conn) } - defer listener.Close() - - conn, err := listener.Dial(udpAddress) - if err != nil { - return err - } - defer conn.Close() - - bufWriter := bufio.NewWriter(conn) buf := []byte("+" + fileId + "\n") - _, err = bufWriter.Write([]byte(buf)) + _, err = c.bufWriter.Write([]byte(buf)) if err != nil { return } util.Uint32toBytes(buf[0:4], fileSize) - _, err = bufWriter.Write(buf[0:4]) + _, err = c.bufWriter.Write(buf[0:4]) if err != nil { return } - _, err = io.Copy(bufWriter, fileReader) + _, err = io.Copy(c.bufWriter, fileReader) if err != nil { return } - bufWriter.Flush() + c.bufWriter.Flush() return nil } |
