aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-15 14:01:37 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-15 14:01:37 -0800
commit1477eead0101ee479e92ce479dbcf38bece320db (patch)
treeb167485d19f414376f9223ef83726e1ae723a98a /weed/util
parentc7ac94ea9a3bc2842ce41c1fcad6fc23781c1296 (diff)
downloadseaweedfs-origin/tcp_read.tar.xz
seaweedfs-origin/tcp_read.zip
final attemptorigin/tcp_read
on par with 1K sized object, but no so good with large ones the default http flow control is better than current implementation.
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/pool.go98
1 files changed, 98 insertions, 0 deletions
diff --git a/weed/util/pool.go b/weed/util/pool.go
new file mode 100644
index 000000000..db137ec0b
--- /dev/null
+++ b/weed/util/pool.go
@@ -0,0 +1,98 @@
+package util
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var (
+ TimeoutErr = errors.New("timeout")
+)
+
+// A bufferedChan implemented by a buffered channel
+type ResourcePool struct {
+ sync.Mutex
+ bufferedChan chan interface{}
+ poolSizeLimit int
+ inuse int
+ newFn func() (interface{}, error)
+}
+
+func NewResourcePool(poolSizeLimit int, newFn func() (interface{}, error)) *ResourcePool {
+ p := &ResourcePool{
+ poolSizeLimit: poolSizeLimit,
+ newFn: newFn,
+ bufferedChan: make(chan interface{}, poolSizeLimit),
+ }
+ return p
+}
+
+func (p *ResourcePool) Size() int {
+ p.Lock()
+ defer p.Unlock()
+ return len(p.bufferedChan) + p.inuse
+}
+
+func (p *ResourcePool) Free() int {
+ p.Lock()
+ defer p.Unlock()
+ return p.poolSizeLimit - p.inuse
+}
+
+func (p *ResourcePool) Get(timeout time.Duration) (interface{}, error) {
+ d, err := p.get(timeout)
+ if err != nil {
+ return nil, err
+ }
+ if d == nil && p.newFn != nil {
+ var err error
+ d, err = p.newFn()
+ if err != nil {
+ return nil, err
+ }
+ }
+ p.Lock()
+ defer p.Unlock()
+ p.inuse++
+ return d, nil
+}
+
+func (p *ResourcePool) Release(v interface{}) {
+
+ p.Lock()
+ defer p.Unlock()
+ if p.inuse == 0 {
+ glog.V(0).Infof("released too many times?")
+ return
+ }
+ p.bufferedChan <- v
+ p.inuse--
+}
+
+func (p *ResourcePool) get(timeout time.Duration) (interface{}, error) {
+
+ select {
+ case v := <-p.bufferedChan:
+ return v, nil
+ default:
+ }
+
+ if p.Free() > 0 {
+ d, err := p.newFn()
+ if err != nil {
+ return nil, err
+ }
+ return d, nil
+ }
+
+ // wait for an freed item
+ select {
+ case v := <-p.bufferedChan:
+ return v, nil
+ case <-time.After(timeout):
+ }
+ return nil, TimeoutErr
+}