aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/upload_content.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/upload_content.go')
-rw-r--r--weed/operation/upload_content.go172
1 files changed, 150 insertions, 22 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 030bf5889..658588ec3 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -13,38 +14,166 @@ import (
"net/textproto"
"path/filepath"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ Md5 string `json:"md5,omitempty"`
+}
+
+func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsCompressed: uploadResult.Gzip > 0,
+ }
+}
+
+// HTTPClient interface for testing
+type HTTPClient interface {
+ Do(req *http.Request) (*http.Response, error)
}
var (
- client *http.Client
+ HttpClient HTTPClient
)
func init() {
- client = &http.Client{Transport: &http.Transport{
+ HttpClient = &http.Client{Transport: &http.Transport{
MaxIdleConnsPerHost: 1024,
}}
}
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
-// Upload sends a POST request to a volume server to upload the content
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- return upload_content(uploadUrl, func(w io.Writer) (err error) {
- _, err = io.Copy(w, reader)
+// Upload sends a POST request to a volume server to upload the content with adjustable compression level
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = util.Md5(data)
+ }
+ return
+}
+
+// Upload sends a POST request to a volume server to upload the content with fast compression
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ hash := md5.New()
+ reader = io.TeeReader(reader, hash)
+ uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
+ }
+ return
+}
+
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ data, err = ioutil.ReadAll(reader)
+ if err != nil {
+ err = fmt.Errorf("read input: %v", err)
+ return
+ }
+ uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ return uploadResult, uploadErr, data
+}
+
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputCompressed
+ shouldGzipNow := false
+ if !isInputCompressed {
+ if mtype == "" {
+ mtype = http.DetectContentType(data)
+ // println("detect1 mimetype to", mtype)
+ if mtype == "application/octet-stream" {
+ mtype = ""
+ }
+ }
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
+ shouldGzipNow = true
+ } else if !iAmSure && mtype == "" && len(data) > 128 {
+ var compressed []byte
+ compressed, err = util.GzipData(data[0:128])
+ shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
+ }
+ }
+
+ var clearDataLen int
+
+ // gzip if possible
+ // this could be double copying
+ clearDataLen = len(data)
+ if shouldGzipNow {
+ compressed, compressErr := util.GzipData(data)
+ // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
+ if compressErr == nil {
+ data = compressed
+ contentIsGzipped = true
+ }
+ } else if isInputCompressed {
+ // just to get the clear data length
+ clearData, err := util.DecompressData(data)
+ if err == nil {
+ clearDataLen = len(clearData)
+ }
+ }
+
+ if cipher {
+ // encrypt(gzip(data))
+
+ // encrypt
+ cipherKey := util.GenCipherKey()
+ encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
+ if encryptionErr != nil {
+ err = fmt.Errorf("encrypt input: %v", encryptionErr)
+ return
+ }
+
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(encryptedData)
+ return
+ }, "", false, len(encryptedData), "", nil, jwt)
+ if uploadResult != nil {
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
+ }
+ } else {
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(data)
+ return
+ }, filename, contentIsGzipped, 0, mtype, pairMap, jwt)
+ }
+
+ if uploadResult == nil {
return
- }, filename, isGzipped, mtype, pairMap, jwt)
+ }
+
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
+
+ return uploadResult, err
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
body_buf := bytes.NewBufferString("")
body_writer := multipart.NewWriter(body_buf)
h := make(textproto.MIMEHeader)
@@ -58,9 +187,6 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if isGzipped {
h.Set("Content-Encoding", "gzip")
}
- if jwt != "" {
- h.Set("Authorization", "BEARER "+string(jwt))
- }
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
@@ -86,24 +212,26 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
for k, v := range pairMap {
req.Header.Set(k, v)
}
- resp, post_err := client.Do(req)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ resp, post_err := HttpClient.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
}
defer resp.Body.Close()
- if resp.StatusCode < http.StatusOK ||
- resp.StatusCode > http.StatusIMUsed {
- return nil, errors.New(http.StatusText(resp.StatusCode))
- }
-
+ var ret UploadResult
etag := getEtag(resp)
+ if resp.StatusCode == http.StatusNoContent {
+ ret.ETag = etag
+ return &ret, nil
+ }
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
return nil, ra_err
}
- var ret UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))