aboutsummaryrefslogtreecommitdiff
path: root/weed/util/http_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/http_util.go')
-rw-r--r--weed/util/http_util.go146
1 files changed, 113 insertions, 33 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 79a442a56..750516b92 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -11,6 +11,8 @@ import (
"net/http"
"net/url"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
var (
@@ -28,18 +30,18 @@ func init() {
}
func PostBytes(url string, body []byte) ([]byte, error) {
- r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
+ r, err := client.Post(url, "", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
- }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("Read response body: %v", err)
}
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
return b, nil
}
@@ -86,7 +88,7 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
@@ -128,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -151,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -187,11 +189,22 @@ func NormalizeUrl(url string) string {
return "http://" + url
}
-func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) {
+func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+
+ if cipherKey != nil {
+ var n int
+ err := readEncryptedUrl(fileUrl, cipherKey, isGzipped, offset, size, func(data []byte) {
+ n = copy(buf, data)
+ })
+ return int64(n), err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- if isReadRange {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return 0, err
+ }
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} else {
req.Header.Set("Accept-Encoding", "gzip")
}
@@ -207,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
}
var reader io.ReadCloser
- switch r.Header.Get("Content-Encoding") {
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
@@ -215,55 +229,121 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
reader = r.Body
}
- var i, m int
+ var (
+ i, m int
+ n int64
+ )
+ // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
+ // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
for {
m, err = reader.Read(buf[i:])
- if m == 0 {
- return
- }
i += m
n += int64(m)
if err == io.EOF {
return n, nil
}
- if e != nil {
- return n, e
+ if err != nil {
+ return n, err
+ }
+ if n == int64(len(buf)) {
+ break
}
}
-
+ // drains the response body to avoid memory leak
+ data, _ := ioutil.ReadAll(reader)
+ if len(data) != 0 {
+ glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
+ }
+ return n, err
}
-func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, offset, size, fn)
+ }
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
+ }
r, err := client.Do(req)
if err != nil {
- return 0, err
+ return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
- return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
- var m int
+ var (
+ m int
+ )
buf := make([]byte, 64*1024)
for {
m, err = r.Body.Read(buf)
- if m == 0 {
- return
- }
fn(buf[:m])
- n += int64(m)
if err == io.EOF {
- return n, nil
+ return nil
+ }
+ if err != nil {
+ return err
}
- if e != nil {
- return n, e
+ }
+
+}
+
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentGzipped bool, offset int64, size int, fn func(data []byte)) error {
+ encryptedData, err := Get(fileUrl)
+ if err != nil {
+ return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ }
+ decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ if err != nil {
+ return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ }
+ if isContentGzipped {
+ decryptedData, err = UnGzipData(decryptedData)
+ if err != nil {
+ return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
}
}
+ if len(decryptedData) < int(offset)+size {
+ return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ }
+ fn(decryptedData[int(offset) : int(offset)+size])
+ return nil
+}
+
+func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+ if rangeHeader != "" {
+ req.Header.Add("Range", rangeHeader)
+ }
+
+ r, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ return r.Body, nil
+}
+func CloseResponse(resp *http.Response) {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
}