aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/operation/grpc_client.go39
-rw-r--r--weed/util/pool.go98
2 files changed, 122 insertions, 15 deletions
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 6820f8696..d4d6d3358 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"sync"
+ "time"
"google.golang.org/grpc"
@@ -17,7 +18,7 @@ import (
)
var (
- connectionPool = make(map[string]*sync.Pool)
+ connectionPool = make(map[string]*util.ResourcePool)
connectionPoolLock sync.Mutex
)
@@ -53,41 +54,49 @@ func WithVolumeServerTcpConnection(volumeServer string, fn func(conn net.Conn) e
return err
}
- conn := getConnection(tcpAddress)
+ conn, err := getConnection(tcpAddress)
+ if err != nil {
+ return err
+ }
defer releaseConnection(conn, tcpAddress)
err = fn(conn)
return err
}
-func getConnection(tcpAddress string) net.Conn {
+func getConnection(tcpAddress string) (net.Conn, error) {
connectionPoolLock.Lock()
defer connectionPoolLock.Unlock()
pool, found := connectionPool[tcpAddress]
if !found {
println("creating pool for", tcpAddress)
- pool = &sync.Pool{New: func() interface{} {
- raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
- if err != nil {
- glog.Fatal(err)
- }
+
+ raddr, err := net.ResolveTCPAddr("tcp", tcpAddress)
+ if err != nil {
+ glog.Fatal(err)
+ }
+
+ pool = util.NewResourcePool(16, func() (interface{}, error) {
conn, err := net.DialTCP("tcp", nil, raddr)
if err != nil {
- glog.Errorf("failed to connect to %s: %v", tcpAddress, err)
- return conn
+ return conn, err
}
conn.SetKeepAlive(true)
conn.SetNoDelay(true)
println("connected", tcpAddress, "=>", conn.LocalAddr().String())
- return conn
- }}
+ return conn, nil
+ })
connectionPool[tcpAddress] = pool
}
- conn := pool.Get().(net.Conn)
+
+ connObj, err := pool.Get(time.Minute)
+ if err != nil {
+ return nil, err
+ }
// println("get connection", tcpAddress, "=>", conn.LocalAddr().String())
- return conn
+ return connObj.(net.Conn), nil
}
func releaseConnection(conn net.Conn, tcpAddress string) {
@@ -99,7 +108,7 @@ func releaseConnection(conn net.Conn, tcpAddress string) {
println("can not return connection", tcpAddress, "=>", conn.LocalAddr().String())
return
}
- pool.Put(conn)
+ pool.Release(conn)
// println("returned connection", tcpAddress, "=>", conn.LocalAddr().String())
}
diff --git a/weed/util/pool.go b/weed/util/pool.go
new file mode 100644
index 000000000..db137ec0b
--- /dev/null
+++ b/weed/util/pool.go
@@ -0,0 +1,98 @@
+package util
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var (
+ TimeoutErr = errors.New("timeout")
+)
+
+// A bufferedChan implemented by a buffered channel
+type ResourcePool struct {
+ sync.Mutex
+ bufferedChan chan interface{}
+ poolSizeLimit int
+ inuse int
+ newFn func() (interface{}, error)
+}
+
+func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
+ p := &ResourcePool{
+ poolSizeLimit: poolSizeLimit,
+ newFn: newFn,
+ bufferedChan: make(chan interface{}, poolSizeLimit),
+ }
+ return p
+}
+
+func (p *ResourcePool) Size() int {
+ p.Lock()
+ defer p.Unlock()
+ return len(p.bufferedChan) + p.inuse
+}
+
+func (p *ResourcePool) Free() int {
+ p.Lock()
+ defer p.Unlock()
+ return p.poolSizeLimit - p.inuse
+}
+
+func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
+ d, err := p.get(timeout)
+ if err != nil {
+ return nil, err
+ }
+ if d == nil && p.newFn != nil {
+ var err error
+ d, err = p.newFn()
+ if err != nil {
+ return nil, err
+ }
+ }
+ p.Lock()
+ defer p.Unlock()
+ p.inuse++
+ return d, nil
+}
+
+func (p *ResourcePool) Release(v interface{}) {
+
+ p.Lock()
+ defer p.Unlock()
+ if p.inuse == 0 {
+ glog.V(0).Infof("released too many times?")
+ return
+ }
+ p.bufferedChan <- v
+ p.inuse--
+}
+
+func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
+
+ select {
+ case v := <-p.bufferedChan:
+ return v, nil
+ default:
+ }
+
+ if p.Free() > 0 {
+ d, err := p.newFn()
+ if err != nil {
+ return nil, err
+ }
+ return d, nil
+ }
+
+ // wait for an freed item
+ select {
+ case v := <-p.bufferedChan:
+ return v, nil
+ case <-time.After(timeout):
+ }
+ return nil, TimeoutErr
+}