aboutsummaryrefslogtreecommitdiff
path: root/weed/util/http_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/http_util.go')
-rw-r--r--weed/util/http_util.go156
1 files changed, 118 insertions, 38 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 21e0a678d..c67eb3276 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -11,9 +11,8 @@ import (
"net/http"
"net/url"
"strings"
- "time"
- "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
var (
@@ -27,23 +26,22 @@ func init() {
}
client = &http.Client{
Transport: Transport,
- Timeout: 5 * time.Second,
}
}
func PostBytes(url string, body []byte) ([]byte, error) {
- r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
+ r, err := client.Post(url, "", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
- }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("Read response body: %v", err)
}
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
return b, nil
}
@@ -90,14 +88,14 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
return r.Header, nil
}
-func Delete(url string, jwt security.EncodedJwt) error {
+func Delete(url string, jwt string) error {
req, err := http.NewRequest("DELETE", url, nil)
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
@@ -119,7 +117,7 @@ func Delete(url string, jwt security.EncodedJwt) error {
return nil
}
m := make(map[string]interface{})
- if e := json.Unmarshal(body, m); e == nil {
+ if e := json.Unmarshal(body, &m); e == nil {
if s, ok := m["error"].(string); ok {
return errors.New(s)
}
@@ -132,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -155,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -191,11 +189,22 @@ func NormalizeUrl(url string) string {
return "http://" + url
}
-func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) {
+func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+
+ if cipherKey != nil {
+ var n int
+ err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ n = copy(buf, data)
+ })
+ return int64(n), err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- if isReadRange {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return 0, err
+ }
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} else {
req.Header.Set("Accept-Encoding", "gzip")
}
@@ -211,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
}
var reader io.ReadCloser
- switch r.Header.Get("Content-Encoding") {
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
@@ -219,55 +229,125 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
reader = r.Body
}
- var i, m int
+ var (
+ i, m int
+ n int64
+ )
+ // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
+ // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
for {
m, err = reader.Read(buf[i:])
- if m == 0 {
- return
- }
i += m
n += int64(m)
if err == io.EOF {
return n, nil
}
- if e != nil {
- return n, e
+ if err != nil {
+ return n, err
+ }
+ if n == int64(len(buf)) {
+ break
}
}
-
+ // drains the response body to avoid memory leak
+ data, _ := ioutil.ReadAll(reader)
+ if len(data) != 0 {
+ glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
+ }
+ return n, err
}
-func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ }
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
+ }
r, err := client.Do(req)
if err != nil {
- return 0, err
+ return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
- return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
- var m int
+ var (
+ m int
+ )
buf := make([]byte, 64*1024)
for {
m, err = r.Body.Read(buf)
- if m == 0 {
- return
- }
fn(buf[:m])
- n += int64(m)
if err == io.EOF {
- return n, nil
+ return nil
+ }
+ if err != nil {
+ return err
}
- if e != nil {
- return n, e
+ }
+
+}
+
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+ encryptedData, err := Get(fileUrl)
+ if err != nil {
+ return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ }
+ decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ if err != nil {
+ return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ }
+ if isContentCompressed {
+ decryptedData, err = DecompressData(decryptedData)
+ if err != nil {
+ return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
}
}
+ if len(decryptedData) < int(offset)+size {
+ return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ }
+ if isFullChunk {
+ fn(decryptedData)
+ } else {
+ fn(decryptedData[int(offset) : int(offset)+size])
+ }
+ return nil
+}
+
+func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+ if rangeHeader != "" {
+ req.Header.Add("Range", rangeHeader)
+ }
+
+ r, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ return r.Body, nil
+}
+func CloseResponse(resp *http.Response) {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
}