aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go4
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go10
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/fasthttp_util.go115
-rw-r--r--weed/util/fla9/fla9.go2
-rw-r--r--weed/util/http_util.go23
-rw-r--r--weed/util/limiter.go74
7 files changed, 107 insertions, 123 deletions
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index 1eb00e1fa..5f26b8c78 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -32,5 +32,7 @@ func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
}
func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
- c.cache.Set(fileId, data, time.Hour)
+ localCopy := make([]byte, len(data))
+ copy(localCopy, data)
+ c.cache.Set(fileId, localCopy, time.Hour)
}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
index d724e925e..6f87a9a06 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -88,15 +88,17 @@ func (v *ChunkCacheVolume) Shutdown() {
}
}
-func (v *ChunkCacheVolume) destroy() {
+func (v *ChunkCacheVolume) doReset() {
v.Shutdown()
- os.Remove(v.fileName + ".dat")
- os.Remove(v.fileName + ".idx")
+ os.Truncate(v.fileName + ".dat", 0)
+ os.Truncate(v.fileName + ".idx", 0)
+ glog.V(4).Infof("cache removeAll %s ...", v.fileName + ".ldb")
os.RemoveAll(v.fileName + ".ldb")
+ glog.V(4).Infof("cache removed %s", v.fileName + ".ldb")
}
func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
- v.destroy()
+ v.doReset()
return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 55f5b52d6..c595f0c53 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 28)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 41)
COMMIT = ""
)
diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go
deleted file mode 100644
index 82575af98..000000000
--- a/weed/util/fasthttp_util.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package util
-
-import (
- "bytes"
- "fmt"
- "github.com/valyala/fasthttp"
- "sync"
- "time"
-)
-
-var (
- fastClient = &fasthttp.Client{
- NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
- MaxConnsPerHost: 1024,
- ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once.
- WriteBufferSize: 64 * 1024, // Same but for your response.
- ReadTimeout: time.Second,
- WriteTimeout: time.Second,
- MaxIdleConnDuration: time.Minute,
- DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
- }
-
- // Put everything in pools to prevent garbage.
- bytesPool = sync.Pool{
- New: func() interface{} {
- b := make([]byte, 0)
- return &b
- },
- }
-
- responsePool = sync.Pool{
- New: func() interface{} {
- return make(chan *fasthttp.Response)
- },
- }
-)
-
-func FastGet(url string) ([]byte, bool, error) {
-
- req := fasthttp.AcquireRequest()
- res := fasthttp.AcquireResponse()
- defer fasthttp.ReleaseRequest(req)
- defer fasthttp.ReleaseResponse(res)
-
- req.SetRequestURIBytes([]byte(url))
- req.Header.Add("Accept-Encoding", "gzip")
-
- err := fastClient.Do(req, res)
- if err != nil {
- return nil, true, err
- }
-
- var data []byte
- contentEncoding := res.Header.Peek("Content-Encoding")
- if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
- data, err = res.BodyGunzip()
- } else {
- data = res.Body()
- }
-
- out := make([]byte, len(data))
- copy(out, data)
-
- if res.StatusCode() >= 400 {
- retryable := res.StatusCode() >= 500
- return nil, retryable, fmt.Errorf("%s: %d", url, res.StatusCode())
- }
- if err != nil {
- return nil, false, err
- }
- return out, false, nil
-}
-
-func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
-
- if cipherKey != nil {
- return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
- }
-
- req := fasthttp.AcquireRequest()
- res := fasthttp.AcquireResponse()
- defer fasthttp.ReleaseRequest(req)
- defer fasthttp.ReleaseResponse(res)
-
- req.SetRequestURIBytes([]byte(fileUrl))
-
- if isFullChunk {
- req.Header.Add("Accept-Encoding", "gzip")
- } else {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
- }
-
- if err = fastClient.Do(req, res); err != nil {
- return true, err
- }
-
- if res.StatusCode() >= 400 {
- retryable = res.StatusCode() >= 500
- return retryable, fmt.Errorf("%s: %d", fileUrl, res.StatusCode())
- }
-
- contentEncoding := res.Header.Peek("Content-Encoding")
- if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
- bodyData, err := res.BodyGunzip()
- if err != nil {
- return false, err
- }
- fn(bodyData)
- } else {
- fn(res.Body())
- }
-
- return false, nil
-
-}
diff --git a/weed/util/fla9/fla9.go b/weed/util/fla9/fla9.go
index 1538daa55..eb5700e8c 100644
--- a/weed/util/fla9/fla9.go
+++ b/weed/util/fla9/fla9.go
@@ -886,7 +886,7 @@ func (f *FlagSet) parseOne() (bool, error) {
// The return value will be ErrHelp if -help or -h were set but not defined.
func (f *FlagSet) Parse(arguments []string) error {
if _, ok := f.formal[DefaultConfigFlagName]; !ok {
- f.String(DefaultConfigFlagName, "", "file with command line options with each line in optionName=optionValue format")
+ f.String(DefaultConfigFlagName, "", "a file of command line options, each line in optionName=optionValue format")
}
f.parsed = true
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index eff282bab..1630760b1 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -124,6 +124,27 @@ func Delete(url string, jwt string) error {
return errors.New(string(body))
}
+func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ if err != nil {
+ return
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return
+ }
+ defer resp.Body.Close()
+ body, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return
+ }
+ httpStatus = resp.StatusCode
+ return
+}
+
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {
@@ -313,7 +334,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
}
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
- encryptedData, retryable, err := FastGet(fileUrl)
+ encryptedData, retryable, err := Get(fileUrl)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
diff --git a/weed/util/limiter.go b/weed/util/limiter.go
index 91499632c..2debaaa85 100644
--- a/weed/util/limiter.go
+++ b/weed/util/limiter.go
@@ -1,5 +1,12 @@
package util
+import (
+ "math/rand"
+ "reflect"
+ "sync"
+ "sync/atomic"
+)
+
// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go
// LimitedConcurrentExecutor object
@@ -38,3 +45,70 @@ func (c *LimitedConcurrentExecutor) Execute(job func()) {
job()
}()
}
+
+// a different implementation, but somehow more "conservative"
+type OperationRequest func()
+
+type LimitedOutOfOrderProcessor struct {
+ processorSlots uint32
+ processors []chan OperationRequest
+ processorLimit int32
+ processorLimitCond *sync.Cond
+ currentProcessor int32
+}
+
+func NewLimitedOutOfOrderProcessor(limit int32) (c *LimitedOutOfOrderProcessor) {
+
+ processorSlots := uint32(32)
+ c = &LimitedOutOfOrderProcessor{
+ processorSlots: processorSlots,
+ processors: make([]chan OperationRequest, processorSlots),
+ processorLimit: limit,
+ processorLimitCond: sync.NewCond(new(sync.Mutex)),
+ }
+
+ for i := 0; i < int(processorSlots); i++ {
+ c.processors[i] = make(chan OperationRequest)
+ }
+
+ cases := make([]reflect.SelectCase, processorSlots)
+ for i, ch := range c.processors {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+
+ go func() {
+ for {
+ _, value, ok := reflect.Select(cases)
+ if !ok {
+ continue
+ }
+
+ request := value.Interface().(OperationRequest)
+
+ if c.processorLimit > 0 {
+ c.processorLimitCond.L.Lock()
+ for atomic.LoadInt32(&c.currentProcessor) > c.processorLimit {
+ c.processorLimitCond.Wait()
+ }
+ atomic.AddInt32(&c.currentProcessor, 1)
+ c.processorLimitCond.L.Unlock()
+ }
+
+ go func() {
+ if c.processorLimit > 0 {
+ defer atomic.AddInt32(&c.currentProcessor, -1)
+ defer c.processorLimitCond.Signal()
+ }
+ request()
+ }()
+
+ }
+ }()
+
+ return c
+}
+
+func (c *LimitedOutOfOrderProcessor) Execute(request OperationRequest) {
+ index := rand.Uint32() % c.processorSlots
+ c.processors[index] <- request
+}