aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/wdclient/net2/base_connection_pool.go159
-rw-r--r--weed/wdclient/net2/connection_pool.go97
-rw-r--r--weed/wdclient/net2/doc.go6
-rw-r--r--weed/wdclient/net2/ip.go177
-rw-r--r--weed/wdclient/net2/managed_connection.go185
-rw-r--r--weed/wdclient/net2/port.go19
-rw-r--r--weed/wdclient/resource_pool/doc.go5
-rw-r--r--weed/wdclient/resource_pool/managed_handle.go97
-rw-r--r--weed/wdclient/resource_pool/multi_resource_pool.go200
-rw-r--r--weed/wdclient/resource_pool/resource_pool.go96
-rw-r--r--weed/wdclient/resource_pool/semaphore.go154
-rw-r--r--weed/wdclient/resource_pool/simple_resource_pool.go343
12 files changed, 1538 insertions, 0 deletions
diff --git a/weed/wdclient/net2/base_connection_pool.go b/weed/wdclient/net2/base_connection_pool.go
new file mode 100644
index 000000000..5cc037d0f
--- /dev/null
+++ b/weed/wdclient/net2/base_connection_pool.go
@@ -0,0 +1,159 @@
+package net2
+
+import (
+ "net"
+ "strings"
+ "time"
+
+ rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
+)
+
+const defaultDialTimeout = 1 * time.Second
+
+func defaultDialFunc(network string, address string) (net.Conn, error) {
+ return net.DialTimeout(network, address, defaultDialTimeout)
+}
+
+func parseResourceLocation(resourceLocation string) (
+ network string,
+ address string) {
+
+ idx := strings.Index(resourceLocation, " ")
+ if idx >= 0 {
+ return resourceLocation[:idx], resourceLocation[idx+1:]
+ }
+
+ return "", resourceLocation
+}
+
+// A thin wrapper around the underlying resource pool.
+type connectionPoolImpl struct {
+ options ConnectionOptions
+
+ pool rp.ResourcePool
+}
+
+// This returns a connection pool where all connections are connected
+// to the same (network, address)
+func newBaseConnectionPool(
+ options ConnectionOptions,
+ createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
+
+ dial := options.Dial
+ if dial == nil {
+ dial = defaultDialFunc
+ }
+
+ openFunc := func(loc string) (interface{}, error) {
+ network, address := parseResourceLocation(loc)
+ return dial(network, address)
+ }
+
+ closeFunc := func(handle interface{}) error {
+ return handle.(net.Conn).Close()
+ }
+
+ poolOptions := rp.Options{
+ MaxActiveHandles: options.MaxActiveConnections,
+ MaxIdleHandles: options.MaxIdleConnections,
+ MaxIdleTime: options.MaxIdleTime,
+ OpenMaxConcurrency: options.DialMaxConcurrency,
+ Open: openFunc,
+ Close: closeFunc,
+ NowFunc: options.NowFunc,
+ }
+
+ return &connectionPoolImpl{
+ options: options,
+ pool: createPool(poolOptions),
+ }
+}
+
+// This returns a connection pool where all connections are connected
+// to the same (network, address)
+func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
+ return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
+}
+
+// This returns a connection pool that manages multiple (network, address)
+// entries. The connections to each (network, address) entry acts
+// independently. For example ("tcp", "localhost:11211") could act as memcache
+// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
+func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
+ return newBaseConnectionPool(
+ options,
+ func(poolOptions rp.Options) rp.ResourcePool {
+ return rp.NewMultiResourcePool(poolOptions, nil)
+ })
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) NumActive() int32 {
+ return p.pool.NumActive()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
+ return p.pool.ActiveHighWaterMark()
+}
+
+// This returns the number of alive idle connections. This method is not part
+// of ConnectionPool's API. It is used only for testing.
+func (p *connectionPoolImpl) NumIdle() int {
+ return p.pool.NumIdle()
+}
+
+// BaseConnectionPool can only register a single (network, address) entry.
+// Register should be call before any Get calls.
+func (p *connectionPoolImpl) Register(network string, address string) error {
+ return p.pool.Register(network + " " + address)
+}
+
+// BaseConnectionPool has nothing to do on Unregister.
+func (p *connectionPoolImpl) Unregister(network string, address string) error {
+ return nil
+}
+
+func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
+ result := make([]NetworkAddress, 0, 1)
+ for _, location := range p.pool.ListRegistered() {
+ network, address := parseResourceLocation(location)
+
+ result = append(
+ result,
+ NetworkAddress{
+ Network: network,
+ Address: address,
+ })
+ }
+ return result
+}
+
+// This gets an active connection from the connection pool. Note that network
+// and address arguments are ignored (The connections with point to the
+// network/address provided by the first Register call).
+func (p *connectionPoolImpl) Get(
+ network string,
+ address string) (ManagedConn, error) {
+
+ handle, err := p.pool.Get(network + " " + address)
+ if err != nil {
+ return nil, err
+ }
+ return NewManagedConn(network, address, handle, p, p.options), nil
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) Release(conn ManagedConn) error {
+ return conn.ReleaseConnection()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
+ return conn.DiscardConnection()
+}
+
+// See ConnectionPool for documentation.
+func (p *connectionPoolImpl) EnterLameDuckMode() {
+ p.pool.EnterLameDuckMode()
+}
diff --git a/weed/wdclient/net2/connection_pool.go b/weed/wdclient/net2/connection_pool.go
new file mode 100644
index 000000000..5b8d4d232
--- /dev/null
+++ b/weed/wdclient/net2/connection_pool.go
@@ -0,0 +1,97 @@
+package net2
+
+import (
+ "net"
+ "time"
+)
+
+type ConnectionOptions struct {
+ // The maximum number of connections that can be active per host at any
+ // given time (A non-positive value indicates the number of connections
+ // is unbounded).
+ MaxActiveConnections int32
+
+ // The maximum number of idle connections per host that are kept alive by
+ // the connection pool.
+ MaxIdleConnections uint32
+
+ // The maximum amount of time an idle connection can alive (if specified).
+ MaxIdleTime *time.Duration
+
+ // This limits the number of concurrent Dial calls (there's no limit when
+ // DialMaxConcurrency is non-positive).
+ DialMaxConcurrency int
+
+ // Dial specifies the dial function for creating network connections.
+ // If Dial is nil, net.DialTimeout is used, with timeout set to 1 second.
+ Dial func(network string, address string) (net.Conn, error)
+
+ // This specifies the now time function. When the function is non-nil, the
+ // connection pool will use the specified function instead of time.Now to
+ // generate the current time.
+ NowFunc func() time.Time
+
+ // This specifies the timeout for any Read() operation.
+ // Note that setting this to 0 (i.e. not setting it) will make
+ // read operations block indefinitely.
+ ReadTimeout time.Duration
+
+ // This specifies the timeout for any Write() operation.
+ // Note that setting this to 0 (i.e. not setting it) will make
+ // write operations block indefinitely.
+ WriteTimeout time.Duration
+}
+
+func (o ConnectionOptions) getCurrentTime() time.Time {
+ if o.NowFunc == nil {
+ return time.Now()
+ } else {
+ return o.NowFunc()
+ }
+}
+
+// A generic interface for managed connection pool. All connection pool
+// implementations must be threadsafe.
+type ConnectionPool interface {
+ // This returns the number of active connections that are on loan.
+ NumActive() int32
+
+ // This returns the highest number of active connections for the entire
+ // lifetime of the pool.
+ ActiveHighWaterMark() int32
+
+ // This returns the number of idle connections that are in the pool.
+ NumIdle() int
+
+ // This associates (network, address) to the connection pool; afterwhich,
+ // the user can get connections to (network, address).
+ Register(network string, address string) error
+
+ // This dissociate (network, address) from the connection pool;
+ // afterwhich, the user can no longer get connections to
+ // (network, address).
+ Unregister(network string, address string) error
+
+ // This returns the list of registered (network, address) entries.
+ ListRegistered() []NetworkAddress
+
+ // This gets an active connection from the connection pool. The connection
+ // will remain active until one of the following is called:
+ // 1. conn.ReleaseConnection()
+ // 2. conn.DiscardConnection()
+ // 3. pool.Release(conn)
+ // 4. pool.Discard(conn)
+ Get(network string, address string) (ManagedConn, error)
+
+ // This releases an active connection back to the connection pool.
+ Release(conn ManagedConn) error
+
+ // This discards an active connection from the connection pool.
+ Discard(conn ManagedConn) error
+
+ // Enter the connection pool into lame duck mode. The connection pool
+ // will no longer return connections, and all idle connections are closed
+ // immediately (including active connections that are released back to the
+ // pool afterward).
+ EnterLameDuckMode()
+}
diff --git a/weed/wdclient/net2/doc.go b/weed/wdclient/net2/doc.go
new file mode 100644
index 000000000..f4d6552e4
--- /dev/null
+++ b/weed/wdclient/net2/doc.go
@@ -0,0 +1,6 @@
+// net2 is a collection of functions meant to supplement the capabilities
+// provided by the standard "net" package.
+package net2
+
+// copied from https://github.com/dropbox/godropbox/tree/master/net2
+// removed other dependencies \ No newline at end of file
diff --git a/weed/wdclient/net2/ip.go b/weed/wdclient/net2/ip.go
new file mode 100644
index 000000000..60e46342f
--- /dev/null
+++ b/weed/wdclient/net2/ip.go
@@ -0,0 +1,177 @@
+package net2
+
+import (
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "strings"
+ "sync"
+)
+
+var myHostname string
+var myHostnameOnce sync.Once
+
+// Like os.Hostname but caches first successful result, making it cheap to call it
+// over and over.
+// It will also crash whole process if fetching Hostname fails!
+func MyHostname() string {
+ myHostnameOnce.Do(func() {
+ var err error
+ myHostname, err = os.Hostname()
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myHostname
+}
+
+var myIp4 *net.IPAddr
+var myIp4Once sync.Once
+
+// Resolves `MyHostname()` to an Ip4 address. Caches first successful result, making it
+// cheap to call it over and over.
+// It will also crash whole process if resolving the IP fails!
+func MyIp4() *net.IPAddr {
+ myIp4Once.Do(func() {
+ var err error
+ myIp4, err = net.ResolveIPAddr("ip4", MyHostname())
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myIp4
+}
+
+var myIp6 *net.IPAddr
+var myIp6Once sync.Once
+
+// Resolves `MyHostname()` to an Ip6 address. Caches first successful result, making it
+// cheap to call it over and over.
+// It will also crash whole process if resolving the IP fails!
+func MyIp6() *net.IPAddr {
+ myIp6Once.Do(func() {
+ var err error
+ myIp6, err = net.ResolveIPAddr("ip6", MyHostname())
+ if err != nil {
+ log.Fatal(err)
+ }
+ })
+ return myIp6
+}
+
+// This returns the list of local ip addresses which other hosts can connect
+// to (NOTE: Loopback ip is ignored).
+// Also resolves Hostname to an address and adds it to the list too, so
+// IPs from /etc/hosts can work too.
+func GetLocalIPs() ([]*net.IP, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, fmt.Errorf("Failed to lookup hostname: %v", err)
+ }
+ // Resolves IP Address from Hostname, this way overrides in /etc/hosts
+ // can work too for IP resolution.
+ ipInfo, err := net.ResolveIPAddr("ip4", hostname)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to resolve ip: %v", err)
+ }
+ ips := []*net.IP{&ipInfo.IP}
+
+ // TODO(zviad): Is rest of the code really necessary?
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil, fmt.Errorf( "Failed to get interface addresses: %v", err)
+ }
+ for _, addr := range addrs {
+ ipnet, ok := addr.(*net.IPNet)
+ if !ok {
+ continue
+ }
+
+ if ipnet.IP.IsLoopback() {
+ continue
+ }
+
+ ips = append(ips, &ipnet.IP)
+ }
+ return ips, nil
+}
+
+var localhostIPNets []*net.IPNet
+
+func init() {
+ for _, mask := range []string{"127.0.0.1/8", "::1/128"} {
+ _, ipnet, err := net.ParseCIDR(mask)
+ if err != nil {
+ panic(err)
+ }
+ localhostIPNets = append(localhostIPNets, ipnet)
+ }
+}
+
+func IsLocalhostIp(ipStr string) bool {
+ ip := net.ParseIP(ipStr)
+ if ip == nil {
+ return false
+ }
+ for _, ipnet := range localhostIPNets {
+ if ipnet.Contains(ip) {
+ return true
+ }
+ }
+ return false
+}
+
+// Given a host string, return true if the host is an ip (v4/v6) localhost.
+func IsLocalhost(host string) bool {
+ return IsLocalhostIp(host) ||
+ host == "localhost" ||
+ host == "ip6-localhost" ||
+ host == "ipv6-localhost"
+}
+
+// Resolves hostnames in addresses to actual IP4 addresses. Skips all invalid addresses
+// and all addresses that can't be resolved.
+// `addrs` are assumed to be of form: ["<hostname>:<port>", ...]
+// Returns an error in addition to resolved addresses if not all resolutions succeed.
+func ResolveIP4s(addrs []string) ([]string, error) {
+ resolvedAddrs := make([]string, 0, len(addrs))
+ var lastErr error
+
+ for _, server := range addrs {
+ hostPort := strings.Split(server, ":")
+ if len(hostPort) != 2 {
+ lastErr = fmt.Errorf("Skipping invalid address: %s", server)
+ continue
+ }
+
+ ip, err := net.ResolveIPAddr("ip4", hostPort[0])
+ if err != nil {
+ lastErr = err
+ continue
+ }
+ resolvedAddrs = append(resolvedAddrs, ip.IP.String()+":"+hostPort[1])
+ }
+ return resolvedAddrs, lastErr
+}
+
+func LookupValidAddrs() (map[string]bool, error) {
+ hostName, err := os.Hostname()
+ if err != nil {
+ return nil, err
+ }
+ addrs, err := net.LookupHost(hostName)
+ if err != nil {
+ return nil, err
+ }
+ validAddrs := make(map[string]bool)
+ validAddrs[hostName] = true
+ for _, addr := range addrs {
+ validAddrs[addr] = true
+ }
+ // Special case localhost/127.0.0.1 so that this works on devVMs. It should
+ // have no affect in production.
+ validAddrs["127.0.0.1"] = true
+ validAddrs["localhost"] = true
+ return validAddrs, nil
+}
diff --git a/weed/wdclient/net2/managed_connection.go b/weed/wdclient/net2/managed_connection.go
new file mode 100644
index 000000000..a886210d1
--- /dev/null
+++ b/weed/wdclient/net2/managed_connection.go
@@ -0,0 +1,185 @@
+package net2
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "errors"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
+)
+
+// Dial's arguments.
+type NetworkAddress struct {
+ Network string
+ Address string
+}
+
+// A connection managed by a connection pool. NOTE: SetDeadline,
+// SetReadDeadline and SetWriteDeadline are disabled for managed connections.
+// (The deadlines are set by the connection pool).
+type ManagedConn interface {
+ net.Conn
+
+ // This returns the original (network, address) entry used for creating
+ // the connection.
+ Key() NetworkAddress
+
+ // This returns the underlying net.Conn implementation.
+ RawConn() net.Conn
+
+ // This returns the connection pool which owns this connection.
+ Owner() ConnectionPool
+
+ // This indictes a user is done with the connection and releases the
+ // connection back to the connection pool.
+ ReleaseConnection() error
+
+ // This indicates the connection is an invalid state, and that the
+ // connection should be discarded from the connection pool.
+ DiscardConnection() error
+}
+
+// A physical implementation of ManagedConn
+type managedConnImpl struct {
+ addr NetworkAddress
+ handle resource_pool.ManagedHandle
+ pool ConnectionPool
+ options ConnectionOptions
+}
+
+// This creates a managed connection wrapper.
+func NewManagedConn(
+ network string,
+ address string,
+ handle resource_pool.ManagedHandle,
+ pool ConnectionPool,
+ options ConnectionOptions) ManagedConn {
+
+ addr := NetworkAddress{
+ Network: network,
+ Address: address,
+ }
+
+ return &managedConnImpl{
+ addr: addr,
+ handle: handle,
+ pool: pool,
+ options: options,
+ }
+}
+
+func (c *managedConnImpl) rawConn() (net.Conn, error) {
+ h, err := c.handle.Handle()
+ return h.(net.Conn), err
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) RawConn() net.Conn {
+ h, _ := c.handle.Handle()
+ return h.(net.Conn)
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) Key() NetworkAddress {
+ return c.addr
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) Owner() ConnectionPool {
+ return c.pool
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) ReleaseConnection() error {
+ return c.handle.Release()
+}
+
+// See ManagedConn for documentation.
+func (c *managedConnImpl) DiscardConnection() error {
+ return c.handle.Discard()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Read(b []byte) (n int, err error) {
+ conn, err := c.rawConn()
+ if err != nil {
+ return 0, err
+ }
+
+ if c.options.ReadTimeout > 0 {
+ deadline := c.options.getCurrentTime().Add(c.options.ReadTimeout)
+ _ = conn.SetReadDeadline(deadline)
+ }
+ n, err = conn.Read(b)
+ if err != nil {
+ var localAddr string
+ if conn.LocalAddr() != nil {
+ localAddr = conn.LocalAddr().String()
+ } else {
+ localAddr = "(nil)"
+ }
+
+ var remoteAddr string
+ if conn.RemoteAddr() != nil {
+ remoteAddr = conn.RemoteAddr().String()
+ } else {
+ remoteAddr = "(nil)"
+ }
+ err = fmt.Errorf("Read error from host: %s <-> %s: %v", localAddr, remoteAddr, err)
+ }
+ return
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Write(b []byte) (n int, err error) {
+ conn, err := c.rawConn()
+ if err != nil {
+ return 0, err
+ }
+
+ if c.options.WriteTimeout > 0 {
+ deadline := c.options.getCurrentTime().Add(c.options.WriteTimeout)
+ _ = conn.SetWriteDeadline(deadline)
+ }
+ n, err = conn.Write(b)
+ if err != nil {
+ err = fmt.Errorf("Write error: %v", err)
+ }
+ return
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) Close() error {
+ return c.handle.Discard()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) LocalAddr() net.Addr {
+ conn, _ := c.rawConn()
+ return conn.LocalAddr()
+}
+
+// See net.Conn for documentation
+func (c *managedConnImpl) RemoteAddr() net.Addr {
+ conn, _ := c.rawConn()
+ return conn.RemoteAddr()
+}
+
+// SetDeadline is disabled for managed connection (The deadline is set by
+// us, with respect to the read/write timeouts specified in ConnectionOptions).
+func (c *managedConnImpl) SetDeadline(t time.Time) error {
+ return errors.New("Cannot set deadline for managed connection")
+}
+
+// SetReadDeadline is disabled for managed connection (The deadline is set by
+// us with respect to the read timeout specified in ConnectionOptions).
+func (c *managedConnImpl) SetReadDeadline(t time.Time) error {
+ return errors.New("Cannot set read deadline for managed connection")
+}
+
+// SetWriteDeadline is disabled for managed connection (The deadline is set by
+// us with respect to the write timeout specified in ConnectionOptions).
+func (c *managedConnImpl) SetWriteDeadline(t time.Time) error {
+ return errors.New("Cannot set write deadline for managed connection")
+}
diff --git a/weed/wdclient/net2/port.go b/weed/wdclient/net2/port.go
new file mode 100644
index 000000000..f83adba28
--- /dev/null
+++ b/weed/wdclient/net2/port.go
@@ -0,0 +1,19 @@
+package net2
+
+import (
+ "net"
+ "strconv"
+)
+
+// Returns the port information.
+func GetPort(addr net.Addr) (int, error) {
+ _, lport, err := net.SplitHostPort(addr.String())
+ if err != nil {
+ return -1, err
+ }
+ lportInt, err := strconv.Atoi(lport)
+ if err != nil {
+ return -1, err
+ }
+ return lportInt, nil
+}
diff --git a/weed/wdclient/resource_pool/doc.go b/weed/wdclient/resource_pool/doc.go
new file mode 100644
index 000000000..c17b17c6c
--- /dev/null
+++ b/weed/wdclient/resource_pool/doc.go
@@ -0,0 +1,5 @@
+// A generic resource pool for managing resources such as network connections.
+package resource_pool
+
+// copied from https://github.com/dropbox/godropbox/tree/master/resource_pool
+// removed other dependencies \ No newline at end of file
diff --git a/weed/wdclient/resource_pool/managed_handle.go b/weed/wdclient/resource_pool/managed_handle.go
new file mode 100644
index 000000000..e1d82ca7b
--- /dev/null
+++ b/weed/wdclient/resource_pool/managed_handle.go
@@ -0,0 +1,97 @@
+package resource_pool
+
+import (
+ "sync/atomic"
+
+ "errors"
+)
+
+// A resource handle managed by a resource pool.
+type ManagedHandle interface {
+ // This returns the handle's resource location.
+ ResourceLocation() string
+
+ // This returns the underlying resource handle (or error if the handle
+ // is no longer active).
+ Handle() (interface{}, error)
+
+ // This returns the resource pool which owns this handle.
+ Owner() ResourcePool
+
+ // The releases the underlying resource handle to the caller and marks the
+ // managed handle as inactive. The caller is responsible for cleaning up
+ // the released handle. This returns nil if the managed handle no longer
+ // owns the resource.
+ ReleaseUnderlyingHandle() interface{}
+
+ // This indictes a user is done with the handle and releases the handle
+ // back to the resource pool.
+ Release() error
+
+ // This indicates the handle is an invalid state, and that the
+ // connection should be discarded from the connection pool.
+ Discard() error
+}
+
+// A physical implementation of ManagedHandle
+type managedHandleImpl struct {
+ location string
+ handle interface{}
+ pool ResourcePool
+ isActive int32 // atomic bool
+ options Options
+}
+
+// This creates a managed handle wrapper.
+func NewManagedHandle(
+ resourceLocation string,
+ handle interface{},
+ pool ResourcePool,
+ options Options) ManagedHandle {
+
+ h := &managedHandleImpl{
+ location: resourceLocation,
+ handle: handle,
+ pool: pool,
+ options: options,
+ }
+ atomic.StoreInt32(&h.isActive, 1)
+
+ return h
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) ResourceLocation() string {
+ return c.location
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Handle() (interface{}, error) {
+ if atomic.LoadInt32(&c.isActive) == 0 {
+ return c.handle, errors.New("Resource handle is no longer valid")
+ }
+ return c.handle, nil
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Owner() ResourcePool {
+ return c.pool
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) ReleaseUnderlyingHandle() interface{} {
+ if atomic.CompareAndSwapInt32(&c.isActive, 1, 0) {
+ return c.handle
+ }
+ return nil
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Release() error {
+ return c.pool.Release(c)
+}
+
+// See ManagedHandle for documentation.
+func (c *managedHandleImpl) Discard() error {
+ return c.pool.Discard(c)
+}
diff --git a/weed/wdclient/resource_pool/multi_resource_pool.go b/weed/wdclient/resource_pool/multi_resource_pool.go
new file mode 100644
index 000000000..9ac25526d
--- /dev/null
+++ b/weed/wdclient/resource_pool/multi_resource_pool.go
@@ -0,0 +1,200 @@
+package resource_pool
+
+import (
+ "fmt"
+ "sync"
+
+ "errors"
+)
+
+// A resource pool implementation that manages multiple resource location
+// entries. The handles to each resource location entry acts independently.
+// For example "tcp localhost:11211" could act as memcache
+// shard 0 and "tcp localhost:11212" could act as memcache shard 1.
+type multiResourcePool struct {
+ options Options
+
+ createPool func(Options) ResourcePool
+
+ rwMutex sync.RWMutex
+ isLameDuck bool // guarded by rwMutex
+ // NOTE: the locationPools is guarded by rwMutex, but the pool entries
+ // are not.
+ locationPools map[string]ResourcePool
+}
+
+// This returns a MultiResourcePool, which manages multiple
+// resource location entries. The handles to each resource location
+// entry acts independently.
+//
+// When createPool is nil, NewSimpleResourcePool is used as default.
+func NewMultiResourcePool(
+ options Options,
+ createPool func(Options) ResourcePool) ResourcePool {
+
+ if createPool == nil {
+ createPool = NewSimpleResourcePool
+ }
+
+ return &multiResourcePool{
+ options: options,
+ createPool: createPool,
+ rwMutex: sync.RWMutex{},
+ isLameDuck: false,
+ locationPools: make(map[string]ResourcePool),
+ }
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) NumActive() int32 {
+ total := int32(0)
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ total += pool.NumActive()
+ }
+ return total
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) ActiveHighWaterMark() int32 {
+ high := int32(0)
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ val := pool.ActiveHighWaterMark()
+ if val > high {
+ high = val
+ }
+ }
+ return high
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) NumIdle() int {
+ total := 0
+
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ for _, pool := range p.locationPools {
+ total += pool.NumIdle()
+ }
+ return total
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Register(resourceLocation string) error {
+ if resourceLocation == "" {
+ return errors.New("Registering invalid resource location")
+ }
+
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ if p.isLameDuck {
+ return fmt.Errorf(
+ "Cannot register %s to lame duck resource pool",
+ resourceLocation)
+ }
+
+ if _, inMap := p.locationPools[resourceLocation]; inMap {
+ return nil
+ }
+
+ pool := p.createPool(p.options)
+ if err := pool.Register(resourceLocation); err != nil {
+ return err
+ }
+
+ p.locationPools[resourceLocation] = pool
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Unregister(resourceLocation string) error {
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ if pool, inMap := p.locationPools[resourceLocation]; inMap {
+ _ = pool.Unregister("")
+ pool.EnterLameDuckMode()
+ delete(p.locationPools, resourceLocation)
+ }
+ return nil
+}
+
+func (p *multiResourcePool) ListRegistered() []string {
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ result := make([]string, 0, len(p.locationPools))
+ for key, _ := range p.locationPools {
+ result = append(result, key)
+ }
+
+ return result
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Get(
+ resourceLocation string) (ManagedHandle, error) {
+
+ pool := p.getPool(resourceLocation)
+ if pool == nil {
+ return nil, fmt.Errorf(
+ "%s is not registered in the resource pool",
+ resourceLocation)
+ }
+ return pool.Get(resourceLocation)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Release(handle ManagedHandle) error {
+ pool := p.getPool(handle.ResourceLocation())
+ if pool == nil {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ return pool.Release(handle)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) Discard(handle ManagedHandle) error {
+ pool := p.getPool(handle.ResourceLocation())
+ if pool == nil {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ return pool.Discard(handle)
+}
+
+// See ResourcePool for documentation.
+func (p *multiResourcePool) EnterLameDuckMode() {
+ p.rwMutex.Lock()
+ defer p.rwMutex.Unlock()
+
+ p.isLameDuck = true
+
+ for _, pool := range p.locationPools {
+ pool.EnterLameDuckMode()
+ }
+}
+
+func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool {
+ p.rwMutex.RLock()
+ defer p.rwMutex.RUnlock()
+
+ if pool, inMap := p.locationPools[resourceLocation]; inMap {
+ return pool
+ }
+ return nil
+}
diff --git a/weed/wdclient/resource_pool/resource_pool.go b/weed/wdclient/resource_pool/resource_pool.go
new file mode 100644
index 000000000..26c433f50
--- /dev/null
+++ b/weed/wdclient/resource_pool/resource_pool.go
@@ -0,0 +1,96 @@
+package resource_pool
+
+import (
+ "time"
+)
+
+type Options struct {
+ // The maximum number of active resource handles per resource location. (A
+ // non-positive value indicates the number of active resource handles is
+ // unbounded).
+ MaxActiveHandles int32
+
+ // The maximum number of idle resource handles per resource location that
+ // are kept alive by the resource pool.
+ MaxIdleHandles uint32
+
+ // The maximum amount of time an idle resource handle can remain alive (if
+ // specified).
+ MaxIdleTime *time.Duration
+
+ // This limits the number of concurrent Open calls (there's no limit when
+ // OpenMaxConcurrency is non-positive).
+ OpenMaxConcurrency int
+
+ // This function creates a resource handle (e.g., a connection) for a
+ // resource location. The function must be thread-safe.
+ Open func(resourceLocation string) (
+ handle interface{},
+ err error)
+
+ // This function destroys a resource handle and performs the necessary
+ // cleanup to free up resources. The function must be thread-safe.
+ Close func(handle interface{}) error
+
+ // This specifies the now time function. When the function is non-nil, the
+ // resource pool will use the specified function instead of time.Now to
+ // generate the current time.
+ NowFunc func() time.Time
+}
+
+func (o Options) getCurrentTime() time.Time {
+ if o.NowFunc == nil {
+ return time.Now()
+ } else {
+ return o.NowFunc()
+ }
+}
+
+// A generic interface for managed resource pool. All resource pool
+// implementations must be threadsafe.
+type ResourcePool interface {
+ // This returns the number of active resource handles.
+ NumActive() int32
+
+ // This returns the highest number of actives handles for the entire
+ // lifetime of the pool. If the pool contains multiple sub-pools, the
+ // high water mark is the max of the sub-pools' high water marks.
+ ActiveHighWaterMark() int32
+
+ // This returns the number of alive idle handles. NOTE: This is only used
+ // for testing.
+ NumIdle() int
+
+ // This associates a resource location to the resource pool; afterwhich,
+ // the user can get resource handles for the resource location.
+ Register(resourceLocation string) error
+
+ // This dissociates a resource location from the resource pool; afterwhich,
+ // the user can no longer get resource handles for the resource location.
+ // If the given resource location corresponds to a sub-pool, the unregistered
+ // sub-pool will enter lame duck mode.
+ Unregister(resourceLocation string) error
+
+ // This returns the list of registered resource location entries.
+ ListRegistered() []string
+
+ // This gets an active resource handle from the resource pool. The
+ // handle will remain active until one of the following is called:
+ // 1. handle.Release()
+ // 2. handle.Discard()
+ // 3. pool.Release(handle)
+ // 4. pool.Discard(handle)
+ Get(key string) (ManagedHandle, error)
+
+ // This releases an active resource handle back to the resource pool.
+ Release(handle ManagedHandle) error
+
+ // This discards an active resource from the resource pool.
+ Discard(handle ManagedHandle) error
+
+ // Enter the resource pool into lame duck mode. The resource pool
+ // will no longer return resource handles, and all idle resource handles
+ // are closed immediately (including active resource handles that are
+ // released back to the pool afterward).
+ EnterLameDuckMode()
+}
diff --git a/weed/wdclient/resource_pool/semaphore.go b/weed/wdclient/resource_pool/semaphore.go
new file mode 100644
index 000000000..ff35d5bc5
--- /dev/null
+++ b/weed/wdclient/resource_pool/semaphore.go
@@ -0,0 +1,154 @@
+package resource_pool
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type Semaphore interface {
+ // Increment the semaphore counter by one.
+ Release()
+
+ // Decrement the semaphore counter by one, and block if counter < 0
+ Acquire()
+
+ // Decrement the semaphore counter by one, and block if counter < 0
+ // Wait for up to the given duration. Returns true if did not timeout
+ TryAcquire(timeout time.Duration) bool
+}
+
+// A simple counting Semaphore.
+type boundedSemaphore struct {
+ slots chan struct{}
+}
+
+// Create a bounded semaphore. The count parameter must be a positive number.
+// NOTE: The bounded semaphore will panic if the user tries to Release
+// beyond the specified count.
+func NewBoundedSemaphore(count uint) Semaphore {
+ sem := &boundedSemaphore{
+ slots: make(chan struct{}, int(count)),
+ }
+ for i := 0; i < cap(sem.slots); i++ {
+ sem.slots <- struct{}{}
+ }
+ return sem
+}
+
+// Acquire returns on successful acquisition.
+func (sem *boundedSemaphore) Acquire() {
+ <-sem.slots
+}
+
+// TryAcquire returns true if it acquires a resource slot within the
+// timeout, false otherwise.
+func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool {
+ if timeout > 0 {
+ // Wait until we get a slot or timeout expires.
+ tm := time.NewTimer(timeout)
+ defer tm.Stop()
+ select {
+ case <-sem.slots:
+ return true
+ case <-tm.C:
+ // Timeout expired. In very rare cases this might happen even if
+ // there is a slot available, e.g. GC pause after we create the timer
+ // and select randomly picked this one out of the two available channels.
+ // We should do one final immediate check below.
+ }
+ }
+
+ // Return true if we have a slot available immediately and false otherwise.
+ select {
+ case <-sem.slots:
+ return true
+ default:
+ return false
+ }
+}
+
+// Release the acquired semaphore. You must not release more than you
+// have acquired.
+func (sem *boundedSemaphore) Release() {
+ select {
+ case sem.slots <- struct{}{}:
+ default:
+ // slots is buffered. If a send blocks, it indicates a programming
+ // error.
+ panic(fmt.Errorf("too many releases for boundedSemaphore"))
+ }
+}
+
+// This returns an unbound counting semaphore with the specified initial count.
+// The semaphore counter can be arbitrary large (i.e., Release can be called
+// unlimited amount of times).
+//
+// NOTE: In general, users should use bounded semaphore since it is more
+// efficient than unbounded semaphore.
+func NewUnboundedSemaphore(initialCount int) Semaphore {
+ res := &unboundedSemaphore{
+ counter: int64(initialCount),
+ }
+ res.cond.L = &res.lock
+ return res
+}
+
+type unboundedSemaphore struct {
+ lock sync.Mutex
+ cond sync.Cond
+ counter int64
+}
+
+func (s *unboundedSemaphore) Release() {
+ s.lock.Lock()
+ s.counter += 1
+ if s.counter > 0 {
+ // Not broadcasting here since it's unlike we can satify all waiting
+ // goroutines. Instead, we will Signal again if there are left over
+ // quota after Acquire, in case of lost wakeups.
+ s.cond.Signal()
+ }
+ s.lock.Unlock()
+}
+
+func (s *unboundedSemaphore) Acquire() {
+ s.lock.Lock()
+ for s.counter < 1 {
+ s.cond.Wait()
+ }
+ s.counter -= 1
+ if s.counter > 0 {
+ s.cond.Signal()
+ }
+ s.lock.Unlock()
+}
+
+func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool {
+ done := make(chan bool, 1)
+ // Gate used to communicate between the threads and decide what the result
+ // is. If the main thread decides, we have timed out, otherwise we succeed.
+ decided := new(int32)
+ atomic.StoreInt32(decided, 0)
+ go func() {
+ s.Acquire()
+ if atomic.SwapInt32(decided, 1) == 0 {
+ // Acquire won the race
+ done <- true
+ } else {
+ // If we already decided the result, and this thread did not win
+ s.Release()
+ }
+ }()
+ select {
+ case <-done:
+ return true
+ case <-time.After(timeout):
+ if atomic.SwapInt32(decided, 1) == 1 {
+ // The other thread already decided the result
+ return true
+ }
+ return false
+ }
+}
diff --git a/weed/wdclient/resource_pool/simple_resource_pool.go b/weed/wdclient/resource_pool/simple_resource_pool.go
new file mode 100644
index 000000000..b0c539100
--- /dev/null
+++ b/weed/wdclient/resource_pool/simple_resource_pool.go
@@ -0,0 +1,343 @@
+package resource_pool
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type idleHandle struct {
+ handle interface{}
+ keepUntil *time.Time
+}
+
+type TooManyHandles struct {
+ location string
+}
+
+func (t TooManyHandles) Error() string {
+ return fmt.Sprintf("Too many handles to %s", t.location)
+}
+
+type OpenHandleError struct {
+ location string
+ err error
+}
+
+func (o OpenHandleError) Error() string {
+ return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err)
+}
+
+// A resource pool implementation where all handles are associated to the
+// same resource location.
+type simpleResourcePool struct {
+ options Options
+
+ numActive *int32 // atomic counter
+
+ activeHighWaterMark *int32 // atomic / monotonically increasing value
+
+ openTokens Semaphore
+
+ mutex sync.Mutex
+ location string // guard by mutex
+ idleHandles []*idleHandle // guarded by mutex
+ isLameDuck bool // guarded by mutex
+}
+
+// This returns a SimpleResourcePool, where all handles are associated to a
+// single resource location.
+func NewSimpleResourcePool(options Options) ResourcePool {
+ numActive := new(int32)
+ atomic.StoreInt32(numActive, 0)
+
+ activeHighWaterMark := new(int32)
+ atomic.StoreInt32(activeHighWaterMark, 0)
+
+ var tokens Semaphore
+ if options.OpenMaxConcurrency > 0 {
+ tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency))
+ }
+
+ return &simpleResourcePool{
+ location: "",
+ options: options,
+ numActive: numActive,
+ activeHighWaterMark: activeHighWaterMark,
+ openTokens: tokens,
+ mutex: sync.Mutex{},
+ idleHandles: make([]*idleHandle, 0, 0),
+ isLameDuck: false,
+ }
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) NumActive() int32 {
+ return atomic.LoadInt32(p.numActive)
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) ActiveHighWaterMark() int32 {
+ return atomic.LoadInt32(p.activeHighWaterMark)
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) NumIdle() int {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ return len(p.idleHandles)
+}
+
+// SimpleResourcePool can only register a single (network, address) entry.
+// Register should be call before any Get calls.
+func (p *simpleResourcePool) Register(resourceLocation string) error {
+ if resourceLocation == "" {
+ return errors.New("Invalid resource location")
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.isLameDuck {
+ return fmt.Errorf(
+ "cannot register %s to lame duck resource pool",
+ resourceLocation)
+ }
+
+ if p.location == "" {
+ p.location = resourceLocation
+ return nil
+ }
+ return errors.New("SimpleResourcePool can only register one location")
+}
+
+// SimpleResourcePool will enter lame duck mode upon calling Unregister.
+func (p *simpleResourcePool) Unregister(resourceLocation string) error {
+ p.EnterLameDuckMode()
+ return nil
+}
+
+func (p *simpleResourcePool) ListRegistered() []string {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.location != "" {
+ return []string{p.location}
+ }
+ return []string{}
+}
+
+func (p *simpleResourcePool) getLocation() (string, error) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.location == "" {
+ return "", fmt.Errorf(
+ "resource location is not set for SimpleResourcePool")
+ }
+
+ if p.isLameDuck {
+ return "", fmt.Errorf(
+ "lame duck resource pool cannot return handles to %s",
+ p.location)
+ }
+
+ return p.location, nil
+}
+
+// This gets an active resource from the resource pool. Note that the
+// resourceLocation argument is ignored (The handles are associated to the
+// resource location provided by the first Register call).
+func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) {
+ activeCount := atomic.AddInt32(p.numActive, 1)
+ if p.options.MaxActiveHandles > 0 &&
+ activeCount > p.options.MaxActiveHandles {
+
+ atomic.AddInt32(p.numActive, -1)
+ return nil, TooManyHandles{p.location}
+ }
+
+ highest := atomic.LoadInt32(p.activeHighWaterMark)
+ for activeCount > highest &&
+ !atomic.CompareAndSwapInt32(
+ p.activeHighWaterMark,
+ highest,
+ activeCount) {
+
+ highest = atomic.LoadInt32(p.activeHighWaterMark)
+ }
+
+ if h := p.getIdleHandle(); h != nil {
+ return h, nil
+ }
+
+ location, err := p.getLocation()
+ if err != nil {
+ atomic.AddInt32(p.numActive, -1)
+ return nil, err
+ }
+
+ if p.openTokens != nil {
+ // Current implementation does not wait for tokens to become available.
+ // If that causes availability hits, we could increase the wait,
+ // similar to simple_pool.go.
+ if p.openTokens.TryAcquire(0) {
+ defer p.openTokens.Release()
+ } else {
+ // We could not immediately acquire a token.
+ // Instead of waiting
+ atomic.AddInt32(p.numActive, -1)
+ return nil, OpenHandleError{
+ p.location, errors.New("Open Error: reached OpenMaxConcurrency")}
+ }
+ }
+
+ handle, err := p.options.Open(location)
+ if err != nil {
+ atomic.AddInt32(p.numActive, -1)
+ return nil, OpenHandleError{p.location, err}
+ }
+
+ return NewManagedHandle(p.location, handle, p, p.options), nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) Release(handle ManagedHandle) error {
+ if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ h := handle.ReleaseUnderlyingHandle()
+ if h != nil {
+ // We can unref either before or after queuing the idle handle.
+ // The advantage of unref-ing before queuing is that there is
+ // a higher chance of successful Get when number of active handles
+ // is close to the limit (but potentially more handle creation).
+ // The advantage of queuing before unref-ing is that there's a
+ // higher chance of reusing handle (but potentially more Get failures).
+ atomic.AddInt32(p.numActive, -1)
+ p.queueIdleHandles(h)
+ }
+
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) Discard(handle ManagedHandle) error {
+ if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
+ return errors.New(
+ "Resource pool cannot take control of a handle owned " +
+ "by another resource pool")
+ }
+
+ h := handle.ReleaseUnderlyingHandle()
+ if h != nil {
+ atomic.AddInt32(p.numActive, -1)
+ if err := p.options.Close(h); err != nil {
+ return fmt.Errorf("failed to close resource handle: %v", err)
+ }
+ }
+ return nil
+}
+
+// See ResourcePool for documentation.
+func (p *simpleResourcePool) EnterLameDuckMode() {
+ p.mutex.Lock()
+
+ toClose := p.idleHandles
+ p.isLameDuck = true
+ p.idleHandles = []*idleHandle{}
+
+ p.mutex.Unlock()
+
+ p.closeHandles(toClose)
+}
+
+// This returns an idle resource, if there is one.
+func (p *simpleResourcePool) getIdleHandle() ManagedHandle {
+ var toClose []*idleHandle
+ defer func() {
+ // NOTE: Must keep the closure around to late bind the toClose slice.
+ p.closeHandles(toClose)
+ }()
+
+ now := p.options.getCurrentTime()
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ var i int
+ for i = 0; i < len(p.idleHandles); i++ {
+ idle := p.idleHandles[i]
+ if idle.keepUntil == nil || now.Before(*idle.keepUntil) {
+ break
+ }
+ }
+ if i > 0 {
+ toClose = p.idleHandles[0:i]
+ }
+
+ if i < len(p.idleHandles) {
+ idle := p.idleHandles[i]
+ p.idleHandles = p.idleHandles[i+1:]
+ return NewManagedHandle(p.location, idle.handle, p, p.options)
+ }
+
+ if len(p.idleHandles) > 0 {
+ p.idleHandles = []*idleHandle{}
+ }
+ return nil
+}
+
+// This adds an idle resource to the pool.
+func (p *simpleResourcePool) queueIdleHandles(handle interface{}) {
+ var toClose []*idleHandle
+ defer func() {
+ // NOTE: Must keep the closure around to late bind the toClose slice.
+ p.closeHandles(toClose)
+ }()
+
+ now := p.options.getCurrentTime()
+ var keepUntil *time.Time
+ if p.options.MaxIdleTime != nil {
+ // NOTE: Assign to temp variable first to work around compiler bug
+ x := now.Add(*p.options.MaxIdleTime)
+ keepUntil = &x
+ }
+
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ if p.isLameDuck {
+ toClose = []*idleHandle{
+ {handle: handle},
+ }
+ return
+ }
+
+ p.idleHandles = append(
+ p.idleHandles,
+ &idleHandle{
+ handle: handle,
+ keepUntil: keepUntil,
+ })
+
+ nIdleHandles := uint32(len(p.idleHandles))
+ if nIdleHandles > p.options.MaxIdleHandles {
+ handlesToClose := nIdleHandles - p.options.MaxIdleHandles
+ toClose = p.idleHandles[0:handlesToClose]
+ p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles]
+ }
+}
+
+// Closes resources, at this point it is assumed that this resources
+// are no longer referenced from the main idleHandles slice.
+func (p *simpleResourcePool) closeHandles(handles []*idleHandle) {
+ for _, handle := range handles {
+ _ = p.options.Close(handle.handle)
+ }
+}