aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-13 19:03:46 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-13 19:03:46 -0800
commit5495497876a29f26fcc212393d938593a4572ca9 (patch)
tree465667d2deba8b4bc7d81090f3fbff7e68a9293e
parente12600cc8eea9a36115d465d2a7511b1b30aad07 (diff)
downloadseaweedfs-5495497876a29f26fcc212393d938593a4572ca9.tar.xz
seaweedfs-5495497876a29f26fcc212393d938593a4572ca9.zip
simplify
-rw-r--r--weed/server/volume_server_udp_handlers.go1
-rw-r--r--weed/wdclient/volume_udp_client.go39
2 files changed, 2 insertions, 38 deletions
diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go
index b1f95446c..974ec78d2 100644
--- a/weed/server/volume_server_udp_handlers.go
+++ b/weed/server/volume_server_udp_handlers.go
@@ -23,6 +23,7 @@ func (vs *VolumeServer) HandleUdpConnection(c net.Conn) {
}
return
}
+ println("received", cmd)
cmd = cmd[:len(cmd)-1]
switch cmd[0] {
case '+':
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
index 470d7a82d..d0365f110 100644
--- a/weed/wdclient/volume_udp_client.go
+++ b/weed/wdclient/volume_udp_client.go
@@ -5,15 +5,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/udptransfer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/wdclient/net2"
"io"
"net"
- "time"
)
// VolumeUdpClient put/get/delete file chunks directly on volume servers without replication
type VolumeUdpClient struct {
- cp net2.ConnectionPool
}
type VolumeUdpConn struct {
@@ -23,41 +20,7 @@ type VolumeUdpConn struct {
}
func NewVolumeUdpClient() *VolumeUdpClient {
- MaxIdleTime := 10 * time.Second
return &VolumeUdpClient{
- cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
- MaxActiveConnections: 16,
- MaxIdleConnections: 1,
- MaxIdleTime: &MaxIdleTime,
- DialMaxConcurrency: 0,
- Dial: func(network string, address string) (net.Conn, error) {
-
- listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
- LocalAddr: "",
- Bandwidth: 100,
- FastRetransmit: true,
- FlatTraffic: true,
- IsServ: false,
- })
- if err != nil {
- return nil, err
- }
-
- conn, err := listener.Dial(address)
- if err != nil {
- return nil, err
- }
- return &VolumeUdpConn{
- conn,
- bufio.NewWriter(conn),
- bufio.NewReader(conn),
- }, err
-
- },
- NowFunc: nil,
- ReadTimeout: 0,
- WriteTimeout: 0,
- }),
}
}
func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
@@ -77,6 +40,7 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
if err != nil {
return err
}
+ defer listener.Close()
conn, err := listener.Dial(udpAddress)
if err != nil {
@@ -100,7 +64,6 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
if err != nil {
return
}
- bufWriter.Write([]byte("!\n"))
bufWriter.Flush()
return nil