diff options
| author | vadimartynov <166398828+vadimartynov@users.noreply.github.com> | 2024-07-17 09:14:09 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-16 23:14:09 -0700 |
| commit | 86d92a42b4861d4bb05c58fea9db84d960995545 (patch) | |
| tree | b3b8cefc07fe3d10c0dc0c69120a9019584bd60a /weed/util/http | |
| parent | c6dec11ea556b8be648f372dfa5cbd074c9f631b (diff) | |
| download | seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.tar.xz seaweedfs-86d92a42b4861d4bb05c58fea9db84d960995545.zip | |
Added tls for http clients (#5766)
* Added global http client
* Added Do func for global http client
* Changed the code to use the global http client
* Fix http client in volume uploader
* Fixed pkg name
* Fixed http util funcs
* Fixed http client for bench_filer_upload
* Fixed http client for stress_filer_upload
* Fixed http client for filer_server_handlers_proxy
* Fixed http client for command_fs_merge_volumes
* Fixed http client for command_fs_merge_volumes and command_volume_fsck
* Fixed http client for s3api_server
* Added init global client for main funcs
* Rename global_client to client
* Changed:
- fixed NewHttpClient;
- added CheckIsHttpsClientEnabled func
- updated security.toml in scaffold
* Reduce the visibility of some functions in the util/http/client pkg
* Added the loadSecurityConfig function
* Use util.LoadSecurityConfiguration() in NewHttpClient func
Diffstat (limited to 'weed/util/http')
| -rw-r--r-- | weed/util/http/client/http_client.go | 201 | ||||
| -rw-r--r-- | weed/util/http/client/http_client_interface.go | 16 | ||||
| -rw-r--r-- | weed/util/http/client/http_client_name.go | 14 | ||||
| -rw-r--r-- | weed/util/http/client/http_client_name_string.go | 23 | ||||
| -rw-r--r-- | weed/util/http/client/http_client_opt.go | 18 | ||||
| -rw-r--r-- | weed/util/http/http_global_client_init.go | 27 | ||||
| -rw-r--r-- | weed/util/http/http_global_client_util.go | 480 |
7 files changed, 779 insertions, 0 deletions
diff --git a/weed/util/http/client/http_client.go b/weed/util/http/client/http_client.go new file mode 100644 index 000000000..d1d2f5c56 --- /dev/null +++ b/weed/util/http/client/http_client.go @@ -0,0 +1,201 @@ +package client + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + util "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/spf13/viper" + "io" + "net/http" + "net/url" + "os" + "strings" + "sync" +) + +var ( + loadSecurityConfigOnce sync.Once +) + +type HTTPClient struct { + Client *http.Client + Transport *http.Transport + expectHttpsScheme bool +} + +func (httpClient *HTTPClient) Do(req *http.Request) (*http.Response, error) { + req.URL.Scheme = httpClient.GetHttpScheme() + return httpClient.Client.Do(req) +} + +func (httpClient *HTTPClient) Get(url string) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Get(url) +} + +func (httpClient *HTTPClient) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Post(url, contentType, body) +} + +func (httpClient *HTTPClient) PostForm(url string, data url.Values) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.PostForm(url, data) +} + +func (httpClient *HTTPClient) Head(url string) (resp *http.Response, err error) { + url, err = httpClient.NormalizeHttpScheme(url) + if err != nil { + return nil, err + } + return httpClient.Client.Head(url) +} +func (httpClient *HTTPClient) CloseIdleConnections() { + httpClient.Client.CloseIdleConnections() +} + +func (httpClient *HTTPClient) GetClientTransport() *http.Transport { + return httpClient.Transport +} + +func (httpClient *HTTPClient) GetHttpScheme() string { + if httpClient.expectHttpsScheme { + return "https" + } + return "http" +} + +func (httpClient *HTTPClient) NormalizeHttpScheme(rawURL string) (string, error) { + expectedScheme := httpClient.GetHttpScheme() + + if !(strings.HasPrefix(rawURL, "http://") || strings.HasPrefix(rawURL, "https://")) { + return expectedScheme + "://" + rawURL, nil + } + + parsedURL, err := url.Parse(rawURL) + if err != nil { + return "", err + } + + if expectedScheme != parsedURL.Scheme { + parsedURL.Scheme = expectedScheme + } + return parsedURL.String(), nil +} + +func NewHttpClient(clientName ClientName, opts ...HttpClientOpt) (*HTTPClient, error) { + httpClient := HTTPClient{} + httpClient.expectHttpsScheme = checkIsHttpsClientEnabled(clientName) + var tlsConfig *tls.Config = nil + + if httpClient.expectHttpsScheme { + clientCertPair, err := getClientCertPair(clientName) + if err != nil { + return nil, err + } + + clientCaCert, clientCaCertName, err := getClientCaCert(clientName) + if err != nil { + return nil, err + } + + if clientCertPair != nil || len(clientCaCert) != 0 { + caCertPool, err := createHTTPClientCertPool(clientCaCert, clientCaCertName) + if err != nil { + return nil, err + } + + tlsConfig = &tls.Config{ + Certificates: []tls.Certificate{}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + + if clientCertPair != nil { + tlsConfig.Certificates = append(tlsConfig.Certificates, *clientCertPair) + } + } + } + + httpClient.Transport = &http.Transport{ + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 1024, + TLSClientConfig: tlsConfig, + } + httpClient.Client = &http.Client{ + Transport: httpClient.Transport, + } + + for _, opt := range opts { + opt(&httpClient) + } + return &httpClient, nil +} + +func getStringOptionFromSecurityConfiguration(clientName ClientName, stringOptionName string) string { + util.LoadSecurityConfiguration() + return viper.GetString(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), stringOptionName)) +} + +func getBoolOptionFromSecurityConfiguration(clientName ClientName, boolOptionName string) bool { + util.LoadSecurityConfiguration() + return viper.GetBool(fmt.Sprintf("https.%s.%s", clientName.LowerCaseString(), boolOptionName)) +} + +func checkIsHttpsClientEnabled(clientName ClientName) bool { + return getBoolOptionFromSecurityConfiguration(clientName, "enabled") +} + +func getFileContentFromSecurityConfiguration(clientName ClientName, fileType string) ([]byte, string, error) { + if fileName := getStringOptionFromSecurityConfiguration(clientName, fileType); fileName != "" { + fileContent, err := os.ReadFile(fileName) + if err != nil { + return nil, fileName, err + } + return fileContent, fileName, err + } + return nil, "", nil +} + +func getClientCertPair(clientName ClientName) (*tls.Certificate, error) { + certFileName := getStringOptionFromSecurityConfiguration(clientName, "cert") + keyFileName := getStringOptionFromSecurityConfiguration(clientName, "key") + if certFileName == "" && keyFileName == "" { + return nil, nil + } + if certFileName != "" && keyFileName != "" { + clientCert, err := tls.LoadX509KeyPair(certFileName, keyFileName) + if err != nil { + return nil, fmt.Errorf("error loading client certificate and key: %s", err) + } + return &clientCert, nil + } + return nil, fmt.Errorf("error loading key pair: key `%s` and certificate `%s`", keyFileName, certFileName) +} + +func getClientCaCert(clientName ClientName) ([]byte, string, error) { + return getFileContentFromSecurityConfiguration(clientName, "ca") +} + +func createHTTPClientCertPool(certContent []byte, fileName string) (*x509.CertPool, error) { + certPool := x509.NewCertPool() + if len(certContent) == 0 { + return certPool, nil + } + + ok := certPool.AppendCertsFromPEM(certContent) + if !ok { + return nil, fmt.Errorf("error processing certificate in %s", fileName) + } + return certPool, nil +} diff --git a/weed/util/http/client/http_client_interface.go b/weed/util/http/client/http_client_interface.go new file mode 100644 index 000000000..7a2d43360 --- /dev/null +++ b/weed/util/http/client/http_client_interface.go @@ -0,0 +1,16 @@ +package client + +import ( + "io" + "net/http" + "net/url" +) + +type HTTPClientInterface interface { + Do(req *http.Request) (*http.Response, error) + Get(url string) (resp *http.Response, err error) + Post(url, contentType string, body io.Reader) (resp *http.Response, err error) + PostForm(url string, data url.Values) (resp *http.Response, err error) + Head(url string) (resp *http.Response, err error) + CloseIdleConnections() +} diff --git a/weed/util/http/client/http_client_name.go b/weed/util/http/client/http_client_name.go new file mode 100644 index 000000000..aedaebbc6 --- /dev/null +++ b/weed/util/http/client/http_client_name.go @@ -0,0 +1,14 @@ +package client + +import "strings" + +type ClientName int + +//go:generate stringer -type=ClientName -output=http_client_name_string.go +const ( + Client ClientName = iota +) + +func (name *ClientName) LowerCaseString() string { + return strings.ToLower(name.String()) +} diff --git a/weed/util/http/client/http_client_name_string.go b/weed/util/http/client/http_client_name_string.go new file mode 100644 index 000000000..652fcdaac --- /dev/null +++ b/weed/util/http/client/http_client_name_string.go @@ -0,0 +1,23 @@ +// Code generated by "stringer -type=ClientName -output=http_client_name_string.go"; DO NOT EDIT. + +package client + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Client-0] +} + +const _ClientName_name = "Client" + +var _ClientName_index = [...]uint8{0, 6} + +func (i ClientName) String() string { + if i < 0 || i >= ClientName(len(_ClientName_index)-1) { + return "ClientName(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _ClientName_name[_ClientName_index[i]:_ClientName_index[i+1]] +} diff --git a/weed/util/http/client/http_client_opt.go b/weed/util/http/client/http_client_opt.go new file mode 100644 index 000000000..1ff9d533d --- /dev/null +++ b/weed/util/http/client/http_client_opt.go @@ -0,0 +1,18 @@ +package client + +import ( + "net" + "time" +) + +type HttpClientOpt = func(clientCfg *HTTPClient) + +func AddDialContext(httpClient *HTTPClient) { + dialContext := (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext + + httpClient.Transport.DialContext = dialContext + httpClient.Client.Transport = httpClient.Transport +} diff --git a/weed/util/http/http_global_client_init.go b/weed/util/http/http_global_client_init.go new file mode 100644 index 000000000..0dcb05cfd --- /dev/null +++ b/weed/util/http/http_global_client_init.go @@ -0,0 +1,27 @@ +package http + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" +) + +var ( + globalHttpClient *util_http_client.HTTPClient +) + +func NewGlobalHttpClient(opt ...util_http_client.HttpClientOpt) (*util_http_client.HTTPClient, error) { + return util_http_client.NewHttpClient(util_http_client.Client, opt...) +} + +func GetGlobalHttpClient() *util_http_client.HTTPClient { + return globalHttpClient +} + +func InitGlobalHttpClient() { + var err error + + globalHttpClient, err = NewGlobalHttpClient() + if err != nil { + glog.Fatalf("error init global http client: %v", err) + } +} diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go new file mode 100644 index 000000000..c3931a790 --- /dev/null +++ b/weed/util/http/http_global_client_util.go @@ -0,0 +1,480 @@ +package http + +import ( + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" + "net/http" + "net/url" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +func Post(url string, values url.Values) ([]byte, error) { + r, err := GetGlobalHttpClient().PostForm(url, values) + if err != nil { + return nil, err + } + defer r.Body.Close() + b, err := io.ReadAll(r.Body) + if r.StatusCode >= 400 { + if err != nil { + return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b)) + } else { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } + } + if err != nil { + return nil, err + } + return b, nil +} + +// github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go +// may need increasing http.Client.Timeout +func Get(url string) ([]byte, bool, error) { + return GetAuthenticated(url, "") +} + +func GetAuthenticated(url, jwt string) ([]byte, bool, error) { + request, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, true, err + } + maybeAddAuth(request, jwt) + request.Header.Add("Accept-Encoding", "gzip") + + response, err := GetGlobalHttpClient().Do(request) + if err != nil { + return nil, true, err + } + defer CloseResponse(response) + + var reader io.ReadCloser + switch response.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(response.Body) + if err != nil { + return nil, true, err + } + defer reader.Close() + default: + reader = response.Body + } + + b, err := io.ReadAll(reader) + if response.StatusCode >= 400 { + retryable := response.StatusCode >= 500 + return nil, retryable, fmt.Errorf("%s: %s", url, response.Status) + } + if err != nil { + return nil, false, err + } + return b, false, nil +} + +func Head(url string) (http.Header, error) { + r, err := GetGlobalHttpClient().Head(url) + if err != nil { + return nil, err + } + defer CloseResponse(r) + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } + return r.Header, nil +} + +func maybeAddAuth(req *http.Request, jwt string) { + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } +} + +func Delete(url string, jwt string) error { + req, err := http.NewRequest(http.MethodDelete, url, nil) + maybeAddAuth(req, jwt) + if err != nil { + return err + } + resp, e := GetGlobalHttpClient().Do(req) + if e != nil { + return e + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + switch resp.StatusCode { + case http.StatusNotFound, http.StatusAccepted, http.StatusOK: + return nil + } + m := make(map[string]interface{}) + if e := json.Unmarshal(body, &m); e == nil { + if s, ok := m["error"].(string); ok { + return errors.New(s) + } + } + return errors.New(string(body)) +} + +func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) { + req, err := http.NewRequest(http.MethodDelete, url, nil) + maybeAddAuth(req, jwt) + if err != nil { + return + } + resp, err := GetGlobalHttpClient().Do(req) + if err != nil { + return + } + defer resp.Body.Close() + body, err = io.ReadAll(resp.Body) + if err != nil { + return + } + httpStatus = resp.StatusCode + return +} + +func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error { + r, err := GetGlobalHttpClient().PostForm(url, values) + if err != nil { + return err + } + defer CloseResponse(r) + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + for { + n, err := r.Body.Read(allocatedBytes) + if n > 0 { + eachBuffer(allocatedBytes[:n]) + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } +} + +func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error { + r, err := GetGlobalHttpClient().PostForm(url, values) + if err != nil { + return err + } + defer CloseResponse(r) + if r.StatusCode != 200 { + return fmt.Errorf("%s: %s", url, r.Status) + } + return readFn(r.Body) +} + +func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) { + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) + if err != nil { + return "", nil, nil, err + } + + maybeAddAuth(req, jwt) + + response, err := GetGlobalHttpClient().Do(req) + if err != nil { + return "", nil, nil, err + } + header = response.Header + contentDisposition := response.Header["Content-Disposition"] + if len(contentDisposition) > 0 { + idx := strings.Index(contentDisposition[0], "filename=") + if idx != -1 { + filename = contentDisposition[0][idx+len("filename="):] + filename = strings.Trim(filename, "\"") + } + } + resp = response + return +} + +func Do(req *http.Request) (resp *http.Response, err error) { + return GetGlobalHttpClient().Do(req) +} + +func NormalizeUrl(url string) (string, error) { + return GetGlobalHttpClient().NormalizeHttpScheme(url) +} + +func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } + + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) + if err != nil { + return 0, err + } + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) + } else { + req.Header.Set("Accept-Encoding", "gzip") + } + + r, err := GetGlobalHttpClient().Do(req) + if err != nil { + return 0, err + } + defer CloseResponse(r) + + if r.StatusCode >= 400 { + return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + if err != nil { + return 0, err + } + defer reader.Close() + default: + reader = r.Body + } + + var ( + i, m int + n int64 + ) + + // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199 + // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318 + for { + m, err = reader.Read(buf[i:]) + i += m + n += int64(m) + if err == io.EOF { + return n, nil + } + if err != nil { + return n, err + } + if n == int64(len(buf)) { + break + } + } + // drains the response body to avoid memory leak + data, _ := io.ReadAll(reader) + if len(data) != 0 { + glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) + } + return n, err +} + +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) +} + +func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + if cipherKey != nil { + return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + } + + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) + maybeAddAuth(req, jwt) + if err != nil { + return false, err + } + + if isFullChunk { + req.Header.Add("Accept-Encoding", "gzip") + } else { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) + } + + r, err := GetGlobalHttpClient().Do(req) + if err != nil { + return true, err + } + defer CloseResponse(r) + if r.StatusCode >= 400 { + retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499 + return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + + var ( + m int + ) + buf := mem.Allocate(64 * 1024) + defer mem.Free(buf) + + for { + m, err = reader.Read(buf) + if m > 0 { + fn(buf[:m]) + } + if err == io.EOF { + return false, nil + } + if err != nil { + return true, err + } + } + +} + +func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) + if err != nil { + return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := util.Decrypt(encryptedData, util.CipherKey(cipherKey)) + if err != nil { + return false, fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentCompressed { + decryptedData, err = util.DecompressData(decryptedData) + if err != nil { + glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err) + } + } + if len(decryptedData) < int(offset)+size { + return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + if isFullChunk { + fn(decryptedData) + } else { + fn(decryptedData[int(offset) : int(offset)+size]) + } + return false, nil +} + +func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) { + + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) + if err != nil { + return nil, nil, err + } + if rangeHeader != "" { + req.Header.Add("Range", rangeHeader) + } else { + req.Header.Add("Accept-Encoding", "gzip") + } + + maybeAddAuth(req, jwt) + + r, err := GetGlobalHttpClient().Do(req) + if err != nil { + return nil, nil, err + } + if r.StatusCode >= 400 { + CloseResponse(r) + return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + if err != nil { + return nil, nil, err + } + default: + reader = r.Body + } + + return r, reader, nil +} + +func CloseResponse(resp *http.Response) { + if resp == nil || resp.Body == nil { + return + } + reader := &CountingReader{reader: resp.Body} + io.Copy(io.Discard, reader) + resp.Body.Close() + if reader.BytesRead > 0 { + glog.V(1).Infof("response leftover %d bytes", reader.BytesRead) + } +} + +func CloseRequest(req *http.Request) { + reader := &CountingReader{reader: req.Body} + io.Copy(io.Discard, reader) + req.Body.Close() + if reader.BytesRead > 0 { + glog.V(1).Infof("request leftover %d bytes", reader.BytesRead) + } +} + +type CountingReader struct { + reader io.Reader + BytesRead int +} + +func (r *CountingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.BytesRead += n + return n, err +} + +func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { + + var shouldRetry bool + + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + n = 0 + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } + }) + if !shouldRetry { + break + } + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + } else { + break + } + } + if err != nil && shouldRetry { + glog.V(0).Infof("retry reading in %v", waitTime) + time.Sleep(waitTime) + } else { + break + } + } + + return n, err + +}
\ No newline at end of file |
