diff options
Diffstat (limited to 'weed/operation/upload_content.go')
| -rw-r--r-- | weed/operation/upload_content.go | 172 |
1 files changed, 150 insertions, 22 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 030bf5889..658588ec3 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -13,38 +14,166 @@ import ( "net/textproto" "path/filepath" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + Md5 string `json:"md5,omitempty"` +} + +func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk { + return &filer_pb.FileChunk{ + FileId: fileId, + Offset: offset, + Size: uint64(uploadResult.Size), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + CipherKey: uploadResult.CipherKey, + IsCompressed: uploadResult.Gzip > 0, + } +} + +// HTTPClient interface for testing +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) } var ( - client *http.Client + HttpClient HTTPClient ) func init() { - client = &http.Client{Transport: &http.Transport{ + HttpClient = &http.Client{Transport: &http.Transport{ MaxIdleConnsPerHost: 1024, }} } var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") -// Upload sends a POST request to a volume server to upload the content -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - return upload_content(uploadUrl, func(w io.Writer) (err error) { - _, err = io.Copy(w, reader) +// Upload sends a POST request to a volume server to upload the content with adjustable compression level +func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + if uploadResult != nil { + uploadResult.Md5 = util.Md5(data) + } + return +} + +// Upload sends a POST request to a volume server to upload the content with fast compression +func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + hash := md5.New() + reader = io.TeeReader(reader, hash) + uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) + if uploadResult != nil { + uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) + } + return +} + +func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { + data, err = ioutil.ReadAll(reader) + if err != nil { + err = fmt.Errorf("read input: %v", err) + return + } + uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + return uploadResult, uploadErr, data +} + +func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + contentIsGzipped := isInputCompressed + shouldGzipNow := false + if !isInputCompressed { + if mtype == "" { + mtype = http.DetectContentType(data) + // println("detect1 mimetype to", mtype) + if mtype == "application/octet-stream" { + mtype = "" + } + } + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed { + shouldGzipNow = true + } else if !iAmSure && mtype == "" && len(data) > 128 { + var compressed []byte + compressed, err = util.GzipData(data[0:128]) + shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90% + } + } + + var clearDataLen int + + // gzip if possible + // this could be double copying + clearDataLen = len(data) + if shouldGzipNow { + compressed, compressErr := util.GzipData(data) + // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) + if compressErr == nil { + data = compressed + contentIsGzipped = true + } + } else if isInputCompressed { + // just to get the clear data length + clearData, err := util.DecompressData(data) + if err == nil { + clearDataLen = len(clearData) + } + } + + if cipher { + // encrypt(gzip(data)) + + // encrypt + cipherKey := util.GenCipherKey() + encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + if encryptionErr != nil { + err = fmt.Errorf("encrypt input: %v", encryptionErr) + return + } + + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(encryptedData) + return + }, "", false, len(encryptedData), "", nil, jwt) + if uploadResult != nil { + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + } + } else { + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(data) + return + }, filename, contentIsGzipped, 0, mtype, pairMap, jwt) + } + + if uploadResult == nil { return - }, filename, isGzipped, mtype, pairMap, jwt) + } + + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } + + return uploadResult, err } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { + +func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { body_buf := bytes.NewBufferString("") body_writer := multipart.NewWriter(body_buf) h := make(textproto.MIMEHeader) @@ -58,9 +187,6 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error if isGzipped { h.Set("Content-Encoding", "gzip") } - if jwt != "" { - h.Set("Authorization", "BEARER "+string(jwt)) - } file_writer, cp_err := body_writer.CreatePart(h) if cp_err != nil { @@ -86,24 +212,26 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error for k, v := range pairMap { req.Header.Set(k, v) } - resp, post_err := client.Do(req) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + resp, post_err := HttpClient.Do(req) if post_err != nil { glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) return nil, post_err } defer resp.Body.Close() - if resp.StatusCode < http.StatusOK || - resp.StatusCode > http.StatusIMUsed { - return nil, errors.New(http.StatusText(resp.StatusCode)) - } - + var ret UploadResult etag := getEtag(resp) + if resp.StatusCode == http.StatusNoContent { + ret.ETag = etag + return &ret, nil + } resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { return nil, ra_err } - var ret UploadResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) |
