aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go4
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/fasthttp_util.go115
-rw-r--r--weed/util/fla9/fla9.go5
-rw-r--r--weed/util/http_util.go2
-rw-r--r--weed/util/limiter.go74
6 files changed, 82 insertions, 120 deletions
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index 1eb00e1fa..5f26b8c78 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -32,5 +32,7 @@ func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
}
func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
- c.cache.Set(fileId, data, time.Hour)
+ localCopy := make([]byte, len(data))
+ copy(localCopy, data)
+ c.cache.Set(fileId, localCopy, time.Hour)
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 55f5b52d6..40f4deae2 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 28)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 38)
COMMIT = ""
)
diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go
deleted file mode 100644
index 82575af98..000000000
--- a/weed/util/fasthttp_util.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package util
-
-import (
- "bytes"
- "fmt"
- "github.com/valyala/fasthttp"
- "sync"
- "time"
-)
-
-var (
- fastClient = &fasthttp.Client{
- NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
- MaxConnsPerHost: 1024,
- ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once.
- WriteBufferSize: 64 * 1024, // Same but for your response.
- ReadTimeout: time.Second,
- WriteTimeout: time.Second,
- MaxIdleConnDuration: time.Minute,
- DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
- }
-
- // Put everything in pools to prevent garbage.
- bytesPool = sync.Pool{
- New: func() interface{} {
- b := make([]byte, 0)
- return &b
- },
- }
-
- responsePool = sync.Pool{
- New: func() interface{} {
- return make(chan *fasthttp.Response)
- },
- }
-)
-
-func FastGet(url string) ([]byte, bool, error) {
-
- req := fasthttp.AcquireRequest()
- res := fasthttp.AcquireResponse()
- defer fasthttp.ReleaseRequest(req)
- defer fasthttp.ReleaseResponse(res)
-
- req.SetRequestURIBytes([]byte(url))
- req.Header.Add("Accept-Encoding", "gzip")
-
- err := fastClient.Do(req, res)
- if err != nil {
- return nil, true, err
- }
-
- var data []byte
- contentEncoding := res.Header.Peek("Content-Encoding")
- if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
- data, err = res.BodyGunzip()
- } else {
- data = res.Body()
- }
-
- out := make([]byte, len(data))
- copy(out, data)
-
- if res.StatusCode() >= 400 {
- retryable := res.StatusCode() >= 500
- return nil, retryable, fmt.Errorf("%s: %d", url, res.StatusCode())
- }
- if err != nil {
- return nil, false, err
- }
- return out, false, nil
-}
-
-func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
-
- if cipherKey != nil {
- return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
- }
-
- req := fasthttp.AcquireRequest()
- res := fasthttp.AcquireResponse()
- defer fasthttp.ReleaseRequest(req)
- defer fasthttp.ReleaseResponse(res)
-
- req.SetRequestURIBytes([]byte(fileUrl))
-
- if isFullChunk {
- req.Header.Add("Accept-Encoding", "gzip")
- } else {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
- }
-
- if err = fastClient.Do(req, res); err != nil {
- return true, err
- }
-
- if res.StatusCode() >= 400 {
- retryable = res.StatusCode() >= 500
- return retryable, fmt.Errorf("%s: %d", fileUrl, res.StatusCode())
- }
-
- contentEncoding := res.Header.Peek("Content-Encoding")
- if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
- bodyData, err := res.BodyGunzip()
- if err != nil {
- return false, err
- }
- fn(bodyData)
- } else {
- fn(res.Body())
- }
-
- return false, nil
-
-}
diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go
index 4a5884e9b..eb5700e8c 100644
--- a/weed/util/fla9/fla9.go
+++ b/weed/util/fla9/fla9.go
@@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) {
// The return value will be ErrHelp if -help or -h were set but not defined.
func (f *FlagSet) Parse(arguments []string) error {
if _, ok := f.formal[DefaultConfigFlagName]; !ok {
- f.String(DefaultConfigFlagName, "", "config file")
+ f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format")
}
f.parsed = true
@@ -995,6 +995,7 @@ func NewFlagSet(name string, errorHandling ErrorHandling) *FlagSet {
f := &FlagSet{
name: name,
errorHandling: errorHandling,
+ envPrefix: EnvPrefix,
}
return f
}
@@ -1078,7 +1079,7 @@ func NewFlagSetWithEnvPrefix(name string, prefix string, errorHandling ErrorHand
// DefaultConfigFlagName defines the flag name of the optional config file
// path. Used to lookup and parse the config file when a default is set and
// available on disk.
-var DefaultConfigFlagName = "conf"
+var DefaultConfigFlagName = "options"
// ParseFile parses flags from the file in path.
// Same format as commandline arguments, newlines and lines beginning with a
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index eff282bab..135d10c45 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -313,7 +313,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
}
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
- encryptedData, retryable, err := FastGet(fileUrl)
+ encryptedData, retryable, err := Get(fileUrl)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
index 91499632c..2debaaa85 100644
--- a/weed/util/limiter.go
+++ b/weed/util/limiter.go
@@ -1,5 +1,12 @@
package util
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "sync/atomic"
+)
+
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
// LimitedConcurrentExecutor object
@@ -38,3 +45,70 @@ func (c *LimitedConcurrentExecutor) Execute(job func()) {
job()
}()
}
+
+// a different implementation, but somehow more "conservative"
+type OperationRequest func()
+
+type LimitedOutOfOrderProcessor struct {
+ processorSlots uint32
+ processors []chan OperationRequest
+ processorLimit int32
+ processorLimitCond *sync.Cond
+ currentProcessor int32
+}
+
+func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
+
+ processorSlots := uint32(32)
+ c = &LimitedOutOfOrderProcessor{
+ processorSlots: processorSlots,
+ processors: make([]chan OperationRequest, processorSlots),
+ processorLimit: limit,
+ processorLimitCond: sync.NewCond(new(sync.Mutex)),
+ }
+
+ for i := 0; i < int(processorSlots); i++ {
+ c.processors[i] = make(chan OperationRequest)
+ }
+
+ cases := make([]reflect.SelectCase, processorSlots)
+ for i, ch := range c.processors {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+
+ go func() {
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ if c.processorLimit > 0 {
+ c.processorLimitCond.L.Lock()
+ for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
+ c.processorLimitCond.Wait()
+ }
+ atomic.AddInt32(&c.currentProcessor, 1)
+ c.processorLimitCond.L.Unlock()
+ }
+
+ go func() {
+ if c.processorLimit > 0 {
+ defer atomic.AddInt32(&c.currentProcessor, -1)
+ defer c.processorLimitCond.Signal()
+ }
+ request()
+ }()
+
+ }
+ }()
+
+ return c
+}
+
+func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
+ index := rand.Uint32() % c.processorSlots
+ c.processors[index] <- request
+}