diff options
| author | HongyanShen <763987993@qq.com> | 2020-03-11 12:55:24 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 12:55:24 +0800 |
| commit | 03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch) | |
| tree | ed8833386a712c850dcef0815509774681a6ab56 /weed/util/http_util.go | |
| parent | 0fca1ae776783b37481549df40f477b7d9248d3c (diff) | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| download | seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip | |
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/util/http_util.go')
| -rw-r--r-- | weed/util/http_util.go | 146 |
1 files changed, 113 insertions, 33 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 79a442a56..750516b92 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -11,6 +11,8 @@ import ( "net/http" "net/url" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -28,18 +30,18 @@ func init() { } func PostBytes(url string, body []byte) ([]byte, error) { - r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) + r, err := client.Post(url, "", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("Read response body: %v", err) } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } return b, nil } @@ -86,7 +88,7 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } @@ -128,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -151,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -187,11 +189,22 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) { +func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + err := readEncryptedUrl(fileUrl, cipherKey, isGzipped, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - if isReadRange { - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return 0, err + } + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } else { req.Header.Set("Accept-Encoding", "gzip") } @@ -207,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo } var reader io.ReadCloser - switch r.Header.Get("Content-Encoding") { + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) defer reader.Close() @@ -215,55 +229,121 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo reader = r.Body } - var i, m int + var ( + i, m int + n int64 + ) + // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199 + // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318 for { m, err = reader.Read(buf[i:]) - if m == 0 { - return - } i += m n += int64(m) if err == io.EOF { return n, nil } - if e != nil { - return n, e + if err != nil { + return n, err + } + if n == int64(len(buf)) { + break } } - + // drains the response body to avoid memory leak + data, _ := ioutil.ReadAll(reader) + if len(data) != 0 { + glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) + } + return n, err } -func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + + if cipherKey != nil { + return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, offset, size, fn) + } + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) + } r, err := client.Do(req) if err != nil { - return 0, err + return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { - return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + return fmt.Errorf("%s: %s", fileUrl, r.Status) } - var m int + var ( + m int + ) buf := make([]byte, 64*1024) for { m, err = r.Body.Read(buf) - if m == 0 { - return - } fn(buf[:m]) - n += int64(m) if err == io.EOF { - return n, nil + return nil + } + if err != nil { + return err } - if e != nil { - return n, e + } + +} + +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentGzipped bool, offset int64, size int, fn func(data []byte)) error { + encryptedData, err := Get(fileUrl) + if err != nil { + return fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + if err != nil { + return fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentGzipped { + decryptedData, err = UnGzipData(decryptedData) + if err != nil { + return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) } } + if len(decryptedData) < int(offset)+size { + return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + fn(decryptedData[int(offset) : int(offset)+size]) + return nil +} + +func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return nil, err + } + if rangeHeader != "" { + req.Header.Add("Range", rangeHeader) + } + + r, err := client.Do(req) + if err != nil { + return nil, err + } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + return r.Body, nil +} +func CloseResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } |
