aboutsummaryrefslogtreecommitdiff
path: root/weed/util/http_util.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/util/http_util.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/util/http_util.go')
-rw-r--r--weed/util/http_util.go214
1 files changed, 166 insertions, 48 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 667d0b4be..1630760b1 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -1,7 +1,6 @@
package util
import (
- "bytes"
"compress/gzip"
"encoding/json"
"errors"
@@ -11,6 +10,8 @@ import (
"net/http"
"net/url"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
var (
@@ -20,6 +21,7 @@ var (
func init() {
Transport = &http.Transport{
+ MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}
client = &http.Client{
@@ -27,22 +29,6 @@ func init() {
}
}
-func PostBytes(url string, body []byte) ([]byte, error) {
- r, err := client.Post(url, "", bytes.NewReader(body))
- if err != nil {
- return nil, fmt.Errorf("Post to %s: %v", url, err)
- }
- defer r.Body.Close()
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
- }
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- return nil, fmt.Errorf("Read response body: %v", err)
- }
- return b, nil
-}
-
func Post(url string, values url.Values) ([]byte, error) {
r, err := client.PostForm(url, values)
if err != nil {
@@ -65,20 +51,35 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
-func Get(url string) ([]byte, error) {
- r, err := client.Get(url)
+func Get(url string) ([]byte, bool, error) {
+
+ request, err := http.NewRequest("GET", url, nil)
+ request.Header.Add("Accept-Encoding", "gzip")
+
+ response, err := client.Do(request)
if err != nil {
- return nil, err
+ return nil, true, err
}
- defer r.Body.Close()
- b, err := ioutil.ReadAll(r.Body)
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
+ defer response.Body.Close()
+
+ var reader io.ReadCloser
+ switch response.Header.Get("Content-Encoding") {
+ case "gzip":
+ reader, err = gzip.NewReader(response.Body)
+ defer reader.Close()
+ default:
+ reader = response.Body
+ }
+
+ b, err := ioutil.ReadAll(reader)
+ if response.StatusCode >= 400 {
+ retryable := response.StatusCode >= 500
+ return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
- return nil, err
+ return nil, false, err
}
- return b, nil
+ return b, false, nil
}
func Head(url string) (http.Header, error) {
@@ -86,7 +87,7 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
@@ -115,7 +116,7 @@ func Delete(url string, jwt string) error {
return nil
}
m := make(map[string]interface{})
- if e := json.Unmarshal(body, m); e == nil {
+ if e := json.Unmarshal(body, &m); e == nil {
if s, ok := m["error"].(string); ok {
return errors.New(s)
}
@@ -123,12 +124,33 @@ func Delete(url string, jwt string) error {
return errors.New(string(body))
}
+func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ if err != nil {
+ return
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return
+ }
+ defer resp.Body.Close()
+ body, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return
+ }
+ httpStatus = resp.StatusCode
+ return
+}
+
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -151,14 +173,14 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
return readFn(r.Body)
}
-func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) {
+func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) {
response, err := client.Get(fileUrl)
if err != nil {
return "", nil, nil, err
@@ -172,7 +194,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re
filename = strings.Trim(filename, "\"")
}
}
- rc = response.Body
+ resp = response
return
}
@@ -187,14 +209,22 @@ func NormalizeUrl(url string) string {
return "http://" + url
}
-func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (int64, error) {
+func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+
+ if cipherKey != nil {
+ var n int
+ _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ n = copy(buf, data)
+ })
+ return int64(n), err
+ }
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
return 0, err
}
- if isReadRange {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} else {
req.Header.Set("Accept-Encoding", "gzip")
}
@@ -210,7 +240,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
}
var reader io.ReadCloser
- switch r.Header.Get("Content-Encoding") {
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
@@ -242,44 +273,131 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
// drains the response body to avoid memory leak
data, _ := ioutil.ReadAll(reader)
if len(data) != 0 {
- err = fmt.Errorf("buffer size is too small. remains %d", len(data))
+ glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
}
return n, err
}
-func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (int64, error) {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ }
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return 0, err
+ return false, err
+ }
+
+ if isFullChunk {
+ req.Header.Add("Accept-Encoding", "gzip")
+ } else {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
r, err := client.Do(req)
if err != nil {
- return 0, err
+ return true, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
- return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ retryable = r.StatusCode >= 500
+ return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ var reader io.ReadCloser
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
+ case "gzip":
+ reader, err = gzip.NewReader(r.Body)
+ defer reader.Close()
+ default:
+ reader = r.Body
}
var (
m int
- n int64
)
buf := make([]byte, 64*1024)
for {
- m, err = r.Body.Read(buf)
+ m, err = reader.Read(buf)
fn(buf[:m])
- n += int64(m)
if err == io.EOF {
- return n, nil
+ return false, nil
}
if err != nil {
- return n, err
+ return false, err
}
}
}
+
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+ encryptedData, retryable, err := Get(fileUrl)
+ if err != nil {
+ return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
+ }
+ decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ if err != nil {
+ return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ }
+ if isContentCompressed {
+ decryptedData, err = DecompressData(decryptedData)
+ if err != nil {
+ glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
+ }
+ }
+ if len(decryptedData) < int(offset)+size {
+ return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ }
+ if isFullChunk {
+ fn(decryptedData)
+ } else {
+ fn(decryptedData[int(offset) : int(offset)+size])
+ }
+ return false, nil
+}
+
+func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+ if rangeHeader != "" {
+ req.Header.Add("Range", rangeHeader)
+ } else {
+ req.Header.Add("Accept-Encoding", "gzip")
+ }
+
+ r, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ var reader io.ReadCloser
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
+ case "gzip":
+ reader, err = gzip.NewReader(r.Body)
+ defer reader.Close()
+ default:
+ reader = r.Body
+ }
+
+ return reader, nil
+}
+
+func CloseResponse(resp *http.Response) {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+}
+
+func CloseRequest(req *http.Request) {
+ io.Copy(ioutil.Discard, req.Body)
+ req.Body.Close()
+}