aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/upload_content.go
diff options
context:
space:
mode:
authorustuzhanin <55892859+ustuzhanin@users.noreply.github.com>2020-10-02 22:47:25 +0500
committerGitHub <noreply@github.com>2020-10-02 22:47:25 +0500
commit3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch)
treee0b42e531d18136d9e272258187a305690ee2b4d /weed/operation/upload_content.go
parentcbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff)
parent9ab98fa912814686b3035a97b5173c1628fbc0fc (diff)
downloadseaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz
seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/operation/upload_content.go')
-rw-r--r--weed/operation/upload_content.go67
1 files changed, 38 insertions, 29 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index cb129daa2..e9002d09d 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,9 +2,7 @@ package operation
import (
"bytes"
- "crypto/md5"
"encoding/json"
- "errors"
"fmt"
"io"
"io/ioutil"
@@ -13,6 +11,7 @@ import (
"net/http"
"net/textproto"
"path/filepath"
+ "runtime/debug"
"strings"
"time"
@@ -23,17 +22,18 @@ import (
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
- CipherKey []byte `json:"cipherKey,omitempty"`
- Mime string `json:"mime,omitempty"`
- Gzip uint32 `json:"gzip,omitempty"`
- Md5 string `json:"md5,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ ContentMd5 string `json:"contentMd5,omitempty"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+ fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
@@ -42,6 +42,7 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *fi
ETag: uploadResult.ETag,
CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,
+ Fid: fid,
}
}
@@ -64,21 +65,13 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = util.Md5(data)
- }
+ uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
- hash := md5.New()
- reader = io.TeeReader(reader, hash)
uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
- }
return
}
@@ -88,10 +81,22 @@ func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader,
err = fmt.Errorf("read input: %v", err)
return
}
- uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return uploadResult, uploadErr, data
}
+func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ for i := 0; i < 1; i++ {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ if err == nil {
+ return
+ } else {
+ glog.Warningf("uploading to %s: %v", uploadUrl, err)
+ }
+ }
+ return
+}
+
func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
contentIsGzipped := isInputCompressed
shouldGzipNow := false
@@ -205,8 +210,8 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
if postErr != nil {
- glog.V(1).Infof("failing to upload to %s: %v", uploadUrl, postErr)
- return nil, fmt.Errorf("failing to upload to %s: %v", uploadUrl, postErr)
+ glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr)
+ return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
@@ -217,10 +222,11 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
}
resp, post_err := HttpClient.Do(req)
if post_err != nil {
- glog.V(1).Infof("failing to upload to %v: %v", uploadUrl, post_err)
- return nil, fmt.Errorf("failing to upload to %v: %v", uploadUrl, post_err)
+ glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
+ debug.PrintStack()
+ return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
}
- defer resp.Body.Close()
+ defer util.CloseResponse(resp)
var ret UploadResult
etag := getEtag(resp)
@@ -228,19 +234,22 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
ret.ETag = etag
return &ret, nil
}
+
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- return nil, ra_err
+ return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err)
}
+
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
- return nil, unmarshal_err
+ glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body))
+ return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err)
}
if ret.Error != "" {
- return nil, errors.New(ret.Error)
+ return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error)
}
ret.ETag = etag
+ ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil
}