aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/filer/filechunk_manifest.go2
-rw-r--r--weed/filer/read_write.go2
-rw-r--r--weed/filer/stream.go2
-rw-r--r--weed/replication/repl_util/replication_utli.go2
-rw-r--r--weed/util/fasthttp_util.go115
-rw-r--r--weed/util/http_util.go2
7 files changed, 121 insertions, 6 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 351efd3dc..c3ca6370a 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -293,7 +293,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
- bytes, _, err = util.Get(url)
+ bytes, _, err = util.FastGet(url)
if err == nil {
break
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 845bfaec1..99a62c90c 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 1f78057ef..7a6da3beb 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -35,7 +35,7 @@ func ReadContent(filerAddress string, dir, name string) ([]byte, error) {
target := fmt.Sprintf("http://%s%s/%s", filerAddress, dir, name)
- data, _, err := util.Get(target)
+ data, _, err := util.FastGet(target)
return data, err
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index f0042a0ff..075204b79 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -181,7 +181,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index c5b8ab4e1..3514c6977 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go
new file mode 100644
index 000000000..59fde37d0
--- /dev/null
+++ b/weed/util/fasthttp_util.go
@@ -0,0 +1,115 @@
+package util
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/valyala/fasthttp"
+ "sync"
+ "time"
+)
+
+var (
+ fastClient = &fasthttp.Client{
+ NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp
+ MaxConnsPerHost: 1024,
+ ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once.
+ WriteBufferSize: 64 * 1024, // Same but for your response.
+ ReadTimeout: time.Second,
+ WriteTimeout: time.Second,
+ MaxIdleConnDuration: time.Minute,
+ DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this.
+ }
+
+ // Put everything in pools to prevent garbage.
+ bytesPool = sync.Pool{
+ New: func() interface{} {
+ b := make([]byte, 0)
+ return &b
+ },
+ }
+
+ responsePool = sync.Pool{
+ New: func() interface{} {
+ return make(chan *fasthttp.Response)
+ },
+ }
+)
+
+func FastGet(url string) ([]byte, bool, error) {
+
+ req := fasthttp.AcquireRequest()
+ res := fasthttp.AcquireResponse()
+ defer fasthttp.ReleaseRequest(req)
+ defer fasthttp.ReleaseResponse(res)
+
+ req.SetRequestURIBytes([]byte(url))
+ req.Header.Add("Accept-Encoding", "gzip")
+
+ err := fastClient.Do(req, res)
+ if err != nil {
+ return nil, true, err
+ }
+
+ var data []byte
+ contentEncoding := res.Header.Peek("Content-Encoding")
+ if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
+ data, err = res.BodyGunzip()
+ } else {
+ data = res.Body()
+ }
+
+ out := make([]byte, len(data))
+ copy(out, data)
+
+ if res.StatusCode() >= 400 {
+ retryable := res.StatusCode() >= 500
+ return nil, retryable, fmt.Errorf("%s: %s", url, res.StatusCode())
+ }
+ if err != nil {
+ return nil, false, err
+ }
+ return out, false, nil
+}
+
+func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ }
+
+ req := fasthttp.AcquireRequest()
+ res := fasthttp.AcquireResponse()
+ defer fasthttp.ReleaseRequest(req)
+ defer fasthttp.ReleaseResponse(res)
+
+ req.SetRequestURIBytes([]byte(fileUrl))
+
+ if isFullChunk {
+ req.Header.Add("Accept-Encoding", "gzip")
+ } else {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
+ }
+
+ if err = fastClient.Do(req, res); err != nil {
+ return true, err
+ }
+
+ if res.StatusCode() >= 400 {
+ retryable = res.StatusCode() >= 500
+ return retryable, fmt.Errorf("%s: %s", fileUrl, res.StatusCode())
+ }
+
+ contentEncoding := res.Header.Peek("Content-Encoding")
+ if bytes.Compare(contentEncoding, []byte("gzip")) == 0 {
+ bodyData, err := res.BodyGunzip()
+ if err != nil {
+ return false, err
+ }
+ fn(bodyData)
+ } else {
+ fn(res.Body())
+ }
+
+ return false, nil
+
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 135d10c45..eff282bab 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -313,7 +313,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
}
func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
- encryptedData, retryable, err := Get(fileUrl)
+ encryptedData, retryable, err := FastGet(fileUrl)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}