diff options
Diffstat (limited to 'weed/util/http_util.go')
| -rw-r--r-- | weed/util/http_util.go | 156 |
1 files changed, 118 insertions, 38 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 21e0a678d..c67eb3276 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -11,9 +11,8 @@ import ( "net/http" "net/url" "strings" - "time" - "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -27,23 +26,22 @@ func init() { } client = &http.Client{ Transport: Transport, - Timeout: 5 * time.Second, } } func PostBytes(url string, body []byte) ([]byte, error) { - r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) + r, err := client.Post(url, "", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("Read response body: %v", err) } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } return b, nil } @@ -90,14 +88,14 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } return r.Header, nil } -func Delete(url string, jwt security.EncodedJwt) error { +func Delete(url string, jwt string) error { req, err := http.NewRequest("DELETE", url, nil) if jwt != "" { req.Header.Set("Authorization", "BEARER "+string(jwt)) @@ -119,7 +117,7 @@ func Delete(url string, jwt security.EncodedJwt) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -132,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -155,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -191,11 +189,22 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) { +func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - if isReadRange { - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return 0, err + } + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } else { req.Header.Set("Accept-Encoding", "gzip") } @@ -211,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo } var reader io.ReadCloser - switch r.Header.Get("Content-Encoding") { + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) defer reader.Close() @@ -219,55 +229,125 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo reader = r.Body } - var i, m int + var ( + i, m int + n int64 + ) + // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199 + // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318 for { m, err = reader.Read(buf[i:]) - if m == 0 { - return - } i += m n += int64(m) if err == io.EOF { return n, nil } - if e != nil { - return n, e + if err != nil { + return n, err + } + if n == int64(len(buf)) { + break } } - + // drains the response body to avoid memory leak + data, _ := ioutil.ReadAll(reader) + if len(data) != 0 { + glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) + } + return n, err } -func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + + if cipherKey != nil { + return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + } + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) + } r, err := client.Do(req) if err != nil { - return 0, err + return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { - return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + return fmt.Errorf("%s: %s", fileUrl, r.Status) } - var m int + var ( + m int + ) buf := make([]byte, 64*1024) for { m, err = r.Body.Read(buf) - if m == 0 { - return - } fn(buf[:m]) - n += int64(m) if err == io.EOF { - return n, nil + return nil + } + if err != nil { + return err } - if e != nil { - return n, e + } + +} + +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + encryptedData, err := Get(fileUrl) + if err != nil { + return fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + if err != nil { + return fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentCompressed { + decryptedData, err = DecompressData(decryptedData) + if err != nil { + return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) } } + if len(decryptedData) < int(offset)+size { + return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + if isFullChunk { + fn(decryptedData) + } else { + fn(decryptedData[int(offset) : int(offset)+size]) + } + return nil +} + +func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return nil, err + } + if rangeHeader != "" { + req.Header.Add("Range", rangeHeader) + } + + r, err := client.Do(req) + if err != nil { + return nil, err + } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + return r.Body, nil +} +func CloseResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } |
