aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-15 14:01:37 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-15 14:01:37 -0800
commit1477eead0101ee479e92ce479dbcf38bece320db (patch)
treeb167485d19f414376f9223ef83726e1ae723a98a /weed/operation
parentc7ac94ea9a3bc2842ce41c1fcad6fc23781c1296 (diff)
downloadseaweedfs-origin/tcp_read.tar.xz
seaweedfs-origin/tcp_read.zip
final attemptorigin/tcp_read
on par with 1K sized object, but no so good with large ones the default http flow control is better than current implementation.
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/grpc_client.go39
1 files changed, 24 insertions, 15 deletions
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 6820f8696..d4d6d3358 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
"google.golang.org/grpc"
@@ -17,7 +18,7 @@ import (
)
var (
- connectionPool = make(map[string]*sync.Pool)
+ connectionPool = make(map[string]*util.ResourcePool)
connectionPoolLock sync.Mutex
)
@@ -53,41 +54,49 @@ func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) e
return err
}
- conn := getConnection(tcpAddress)
+ conn, err := getConnection(tcpAddress)
+ if err != nil {
+ return err
+ }
defer releaseConnection(conn, tcpAddress)
err = fn(conn)
return err
}
-func getConnection(tcpAddress string) net.Conn {
+func getConnection(tcpAddress string) (net.Conn, error) {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
println("creating pool for", tcpAddress)
- pool = &sync.Pool{New: func() interface{} {
- raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
- if err != nil {
- glog.Fatal(err)
- }
+
+ raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
+ if err != nil {
+ glog.Fatal(err)
+ }
+
+ pool = util.NewResourcePool(16, func() (interface{}, error) {
conn, err := net.DialTCP("tcp", nil, raddr)
if err != nil {
- glog.Errorf("failed to connect to %s: %v", tcpAddress, err)
- return conn
+ return conn, err
}
conn.SetKeepAlive(true)
conn.SetNoDelay(true)
println("connected", tcpAddress, "=>", conn.LocalAddr().String())
- return conn
- }}
+ return conn, nil
+ })
connectionPool[tcpAddress] = pool
}
- conn := pool.Get().(net.Conn)
+
+ connObj, err := pool.Get(time.Minute)
+ if err != nil {
+ return nil, err
+ }
// println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
- return conn
+ return connObj.(net.Conn), nil
}
func releaseConnection(conn net.Conn, tcpAddress string) {
@@ -99,7 +108,7 @@ func releaseConnection(conn net.Conn, tcpAddress string) {
println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
return
}
- pool.Put(conn)
+ pool.Release(conn)
// println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
}