diff options
Diffstat (limited to 'weed/operation/upload_content.go')
| -rw-r--r-- | weed/operation/upload_content.go | 143 |
1 files changed, 87 insertions, 56 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index 8e7c6f733..3d41d2eb5 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -4,22 +4,31 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" "io" - "io/ioutil" "mime" "mime/multipart" + "net" "net/http" "net/textproto" "path/filepath" "strings" "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" ) +type UploadOption struct { + UploadUrl string + Filename string + Cipher bool + IsInputCompressed bool + MimeType string + PairMap map[string]string + Jwt security.EncodedJwt +} + type UploadResult struct { Name string `json:"name,omitempty"` Size uint32 `json:"size,omitempty"` @@ -57,68 +66,72 @@ var ( func init() { HttpClient = &http.Client{Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 10 * time.Second, + }).DialContext, MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} } -var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) +var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "") // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) +func UploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + uploadResult, err = retriedUploadData(data, option) return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { - uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt) +func Upload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { + uploadResult, err, data = doUpload(reader, option) return } -func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) { +func doUpload(reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) { bytesReader, ok := reader.(*util.BytesReader) if ok { data = bytesReader.Bytes } else { - data, err = ioutil.ReadAll(reader) + data, err = io.ReadAll(reader) if err != nil { err = fmt.Errorf("read input: %v", err) return } } - uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + uploadResult, uploadErr := retriedUploadData(data, option) return uploadResult, uploadErr, data } -func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { +func retriedUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { for i := 0; i < 3; i++ { - uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) + uploadResult, err = doUploadData(data, option) if err == nil { uploadResult.RetryCount = i return } else { - glog.Warningf("uploading to %s: %v", uploadUrl, err) + glog.Warningf("uploading to %s: %v", option.UploadUrl, err) } time.Sleep(time.Millisecond * time.Duration(237*(i+1))) } return } -func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - contentIsGzipped := isInputCompressed +func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult, err error) { + contentIsGzipped := option.IsInputCompressed shouldGzipNow := false - if !isInputCompressed { - if mtype == "" { - mtype = http.DetectContentType(data) - // println("detect1 mimetype to", mtype) - if mtype == "application/octet-stream" { - mtype = "" + if !option.IsInputCompressed { + if option.MimeType == "" { + option.MimeType = http.DetectContentType(data) + // println("detect1 mimetype to", MimeType) + if option.MimeType == "application/octet-stream" { + option.MimeType = "" } } - if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed { + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed { shouldGzipNow = true - } else if !iAmSure && mtype == "" && len(data) > 16*1024 { + } else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 { var compressed []byte compressed, err = util.GzipData(data[0:128]) shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90% @@ -131,14 +144,14 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i // this could be double copying clearDataLen = len(data) clearData := data - if shouldGzipNow && !cipher { + if shouldGzipNow && !option.Cipher { compressed, compressErr := util.GzipData(data) // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed)) if compressErr == nil { data = compressed contentIsGzipped = true } - } else if isInputCompressed { + } else if option.IsInputCompressed { // just to get the clear data length clearData, err = util.DecompressData(data) if err == nil { @@ -146,7 +159,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } } - if cipher { + if option.Cipher { // encrypt(gzip(data)) // encrypt @@ -158,23 +171,39 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i } // upload data - uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + uploadResult, err = upload_content(func(w io.Writer) (err error) { _, err = w.Write(encryptedData) return - }, "", false, len(encryptedData), "", nil, jwt) + }, len(encryptedData), &UploadOption{ + UploadUrl: option.UploadUrl, + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: option.Jwt, + }) if uploadResult == nil { return } - uploadResult.Name = filename - uploadResult.Mime = mtype + uploadResult.Name = option.Filename + uploadResult.Mime = option.MimeType uploadResult.CipherKey = cipherKey uploadResult.Size = uint32(clearDataLen) } else { // upload data - uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + uploadResult, err = upload_content(func(w io.Writer) (err error) { _, err = w.Write(data) return - }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt) + }, len(data), &UploadOption{ + UploadUrl: option.UploadUrl, + Filename: option.Filename, + Cipher: false, + IsInputCompressed: contentIsGzipped, + MimeType: option.MimeType, + PairMap: option.PairMap, + Jwt: option.Jwt, + }) if uploadResult == nil { return } @@ -187,20 +216,21 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i return uploadResult, err } -func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { +func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) { buf := GetBuffer() defer PutBuffer(buf) body_writer := multipart.NewWriter(buf) h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) - h.Set("Idempotency-Key", uploadUrl) - if mtype == "" { - mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) + filename := fileNameEscaper.Replace(option.Filename) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename)) + h.Set("Idempotency-Key", option.UploadUrl) + if option.MimeType == "" { + option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename))) } - if mtype != "" { - h.Set("Content-Type", mtype) + if option.MimeType != "" { + h.Set("Content-Type", option.MimeType) } - if isGzipped { + if option.IsInputCompressed { h.Set("Content-Encoding", "gzip") } @@ -219,28 +249,29 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return nil, err } - req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes())) + req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes())) if postErr != nil { - glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr) - return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr) + glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr) + return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr) } req.Header.Set("Content-Type", content_type) - for k, v := range pairMap { + for k, v := range option.PairMap { req.Header.Set(k, v) } - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) + if option.Jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(option.Jwt)) } // print("+") resp, post_err := HttpClient.Do(req) if post_err != nil { if strings.Contains(post_err.Error(), "connection reset by peer") || strings.Contains(post_err.Error(), "use of closed network connection") { + glog.V(1).Infof("repeat error upload request %s: %v", option.UploadUrl, postErr) resp, post_err = HttpClient.Do(req) } } if post_err != nil { - return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err) + return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err) } // print("-") defer util.CloseResponse(resp) @@ -252,18 +283,18 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error return &ret, nil } - resp_body, ra_err := ioutil.ReadAll(resp.Body) + resp_body, ra_err := io.ReadAll(resp.Body) if ra_err != nil { - return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err) + return nil, fmt.Errorf("read response body %v: %v", option.UploadUrl, ra_err) } unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body)) - return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err) + glog.Errorf("unmarshal %s: %v", option.UploadUrl, string(resp_body)) + return nil, fmt.Errorf("unmarshal %v: %v", option.UploadUrl, unmarshal_err) } if ret.Error != "" { - return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error) + return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error) } ret.ETag = etag ret.ContentMd5 = resp.Header.Get("Content-MD5") |
