diff options
Diffstat (limited to 'weed/util/http_util.go')
| -rw-r--r-- | weed/util/http_util.go | 214 |
1 files changed, 166 insertions, 48 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 667d0b4be..1630760b1 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -1,7 +1,6 @@ package util import ( - "bytes" "compress/gzip" "encoding/json" "errors" @@ -11,6 +10,8 @@ import ( "net/http" "net/url" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -20,6 +21,7 @@ var ( func init() { Transport = &http.Transport{ + MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, } client = &http.Client{ @@ -27,22 +29,6 @@ func init() { } } -func PostBytes(url string, body []byte) ([]byte, error) { - r, err := client.Post(url, "", bytes.NewReader(body)) - if err != nil { - return nil, fmt.Errorf("Post to %s: %v", url, err) - } - defer r.Body.Close() - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return nil, fmt.Errorf("Read response body: %v", err) - } - return b, nil -} - func Post(url string, values url.Values) ([]byte, error) { r, err := client.PostForm(url, values) if err != nil { @@ -65,20 +51,35 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout -func Get(url string) ([]byte, error) { - r, err := client.Get(url) +func Get(url string) ([]byte, bool, error) { + + request, err := http.NewRequest("GET", url, nil) + request.Header.Add("Accept-Encoding", "gzip") + + response, err := client.Do(request) if err != nil { - return nil, err + return nil, true, err } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) + defer response.Body.Close() + + var reader io.ReadCloser + switch response.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(response.Body) + defer reader.Close() + default: + reader = response.Body + } + + b, err := ioutil.ReadAll(reader) + if response.StatusCode >= 400 { + retryable := response.StatusCode >= 500 + return nil, retryable, fmt.Errorf("%s: %s", url, response.Status) } if err != nil { - return nil, err + return nil, false, err } - return b, nil + return b, false, nil } func Head(url string) (http.Header, error) { @@ -86,7 +87,7 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } @@ -115,7 +116,7 @@ func Delete(url string, jwt string) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -123,12 +124,33 @@ func Delete(url string, jwt string) error { return errors.New(string(body)) } +func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { + req, err := http.NewRequest("DELETE", url, nil) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + if err != nil { + return + } + resp, err := client.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return + } + httpStatus = resp.StatusCode + return +} + func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { r, err := client.PostForm(url, values) if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -151,14 +173,14 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } return readFn(r.Body) } -func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) { +func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) { response, err := client.Get(fileUrl) if err != nil { return "", nil, nil, err @@ -172,7 +194,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re filename = strings.Trim(filename, "\"") } } - rc = response.Body + resp = response return } @@ -187,14 +209,22 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (int64, error) { +func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { return 0, err } - if isReadRange { - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } else { req.Header.Set("Accept-Encoding", "gzip") } @@ -210,7 +240,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo } var reader io.ReadCloser - switch r.Header.Get("Content-Encoding") { + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) defer reader.Close() @@ -242,44 +273,131 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo // drains the response body to avoid memory leak data, _ := ioutil.ReadAll(reader) if len(data) != 0 { - err = fmt.Errorf("buffer size is too small. remains %d", len(data)) + glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) } return n, err } -func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (int64, error) { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + + if cipherKey != nil { + return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + } req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { - return 0, err + return false, err + } + + if isFullChunk { + req.Header.Add("Accept-Encoding", "gzip") + } else { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) r, err := client.Do(req) if err != nil { - return 0, err + return true, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { - return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + retryable = r.StatusCode >= 500 + return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body } var ( m int - n int64 ) buf := make([]byte, 64*1024) for { - m, err = r.Body.Read(buf) + m, err = reader.Read(buf) fn(buf[:m]) - n += int64(m) if err == io.EOF { - return n, nil + return false, nil } if err != nil { - return n, err + return false, err } } } + +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := Get(fileUrl) + if err != nil { + return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + if err != nil { + return false, fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentCompressed { + decryptedData, err = DecompressData(decryptedData) + if err != nil { + glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err) + } + } + if len(decryptedData) < int(offset)+size { + return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + if isFullChunk { + fn(decryptedData) + } else { + fn(decryptedData[int(offset) : int(offset)+size]) + } + return false, nil +} + +func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return nil, err + } + if rangeHeader != "" { + req.Header.Add("Range", rangeHeader) + } else { + req.Header.Add("Accept-Encoding", "gzip") + } + + r, err := client.Do(req) + if err != nil { + return nil, err + } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + + return reader, nil +} + +func CloseResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() +} + +func CloseRequest(req *http.Request) { + io.Copy(ioutil.Discard, req.Body) + req.Body.Close() +} |
