diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-02-15 14:01:37 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-02-15 14:01:37 -0800 |
| commit | 1477eead0101ee479e92ce479dbcf38bece320db (patch) | |
| tree | b167485d19f414376f9223ef83726e1ae723a98a /weed/operation/grpc_client.go | |
| parent | c7ac94ea9a3bc2842ce41c1fcad6fc23781c1296 (diff) | |
| download | seaweedfs-1477eead0101ee479e92ce479dbcf38bece320db.tar.xz seaweedfs-1477eead0101ee479e92ce479dbcf38bece320db.zip | |
final attemptorigin/tcp_read
on par with 1K sized object, but no so good with large ones
the default http flow control is better than current implementation.
Diffstat (limited to 'weed/operation/grpc_client.go')
| -rw-r--r-- | weed/operation/grpc_client.go | 39 |
1 files changed, 24 insertions, 15 deletions
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 6820f8696..d4d6d3358 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "time" "google.golang.org/grpc" @@ -17,7 +18,7 @@ import ( ) var ( - connectionPool = make(map[string]*sync.Pool) + connectionPool = make(map[string]*util.ResourcePool) connectionPoolLock sync.Mutex ) @@ -53,41 +54,49 @@ func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) e return err } - conn := getConnection(tcpAddress) + conn, err := getConnection(tcpAddress) + if err != nil { + return err + } defer releaseConnection(conn, tcpAddress) err = fn(conn) return err } -func getConnection(tcpAddress string) net.Conn { +func getConnection(tcpAddress string) (net.Conn, error) { connectionPoolLock.Lock() defer connectionPoolLock.Unlock() pool, found := connectionPool[tcpAddress] if !found { println("creating pool for", tcpAddress) - pool = &sync.Pool{New: func() interface{} { - raddr, err := net.ResolveTCPAddr("tcp", tcpAddress) - if err != nil { - glog.Fatal(err) - } + + raddr, err := net.ResolveTCPAddr("tcp", tcpAddress) + if err != nil { + glog.Fatal(err) + } + + pool = util.NewResourcePool(16, func() (interface{}, error) { conn, err := net.DialTCP("tcp", nil, raddr) if err != nil { - glog.Errorf("failed to connect to %s: %v", tcpAddress, err) - return conn + return conn, err } conn.SetKeepAlive(true) conn.SetNoDelay(true) println("connected", tcpAddress, "=>", conn.LocalAddr().String()) - return conn - }} + return conn, nil + }) connectionPool[tcpAddress] = pool } - conn := pool.Get().(net.Conn) + + connObj, err := pool.Get(time.Minute) + if err != nil { + return nil, err + } // println("get connection", tcpAddress, "=>", conn.LocalAddr().String()) - return conn + return connObj.(net.Conn), nil } func releaseConnection(conn net.Conn, tcpAddress string) { @@ -99,7 +108,7 @@ func releaseConnection(conn net.Conn, tcpAddress string) { println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String()) return } - pool.Put(conn) + pool.Release(conn) // println("returned connection", tcpAddress, "=>", conn.LocalAddr().String()) } |
