diff options
Diffstat (limited to 'weed/operation/upload_content.go')
| -rw-r--r-- | weed/operation/upload_content.go | 145 |
1 files changed, 80 insertions, 65 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index c387d0230..117da1c18 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -1,14 +1,14 @@ package operation import ( - "bytes" + "bufio" "compress/flate" "compress/gzip" + "crypto/rand" "encoding/json" "errors" "fmt" "io" - "io/ioutil" "mime" "mime/multipart" "net/http" @@ -16,6 +16,8 @@ import ( "path/filepath" "strings" + "github.com/valyala/fasthttp" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -77,76 +79,89 @@ func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped boo }, filename, contentIsGzipped, mtype, pairMap, jwt) } +func randomBoundary() string { + var buf [30]byte + _, err := io.ReadFull(rand.Reader, buf[:]) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x", buf[:]) +} + func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - body_buf := bytes.NewBufferString("") - body_writer := multipart.NewWriter(body_buf) - h := make(textproto.MIMEHeader) - h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + if mtype == "" { mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename))) } - if mtype != "" { - h.Set("Content-Type", mtype) - } - if isGzipped { - h.Set("Content-Encoding", "gzip") - } - - file_writer, cp_err := body_writer.CreatePart(h) - if cp_err != nil { - glog.V(0).Infoln("error creating form file", cp_err.Error()) - return nil, cp_err - } - if err := fillBufferFunction(file_writer); err != nil { - glog.V(0).Infoln("error copying data", err) - return nil, err - } - content_type := body_writer.FormDataContentType() - if err := body_writer.Close(); err != nil { - glog.V(0).Infoln("error closing body", err) - return nil, err - } + boundary := randomBoundary() + contentType := "multipart/form-data; boundary=" + boundary - req, postErr := http.NewRequest("POST", uploadUrl, body_buf) - if postErr != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error()) - return nil, postErr - } - req.Header.Set("Content-Type", content_type) - for k, v := range pairMap { - req.Header.Set(k, v) - } - if jwt != "" { - req.Header.Set("Authorization", "BEARER "+string(jwt)) - } - resp, post_err := client.Do(req) - if post_err != nil { - glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error()) - return nil, post_err - } - defer resp.Body.Close() - etag := getEtag(resp) - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - return nil, ra_err - } var ret UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) - return nil, unmarshal_err - } - if ret.Error != "" { - return nil, errors.New(ret.Error) + var etag string + var writeErr error + err := util.PostContent(uploadUrl, func(w *bufio.Writer) { + h := make(textproto.MIMEHeader) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename))) + if mtype != "" { + h.Set("Content-Type", mtype) + } + if isGzipped { + h.Set("Content-Encoding", "gzip") + } + + body_writer := multipart.NewWriter(w) + body_writer.SetBoundary(boundary) + file_writer, cp_err := body_writer.CreatePart(h) + if cp_err != nil { + glog.V(0).Infoln("error creating form file", cp_err.Error()) + writeErr = cp_err + return + } + if err := fillBufferFunction(file_writer); err != nil { + glog.V(0).Infoln("error copying data", err) + writeErr = err + return + } + if err := body_writer.Close(); err != nil { + glog.V(0).Infoln("error closing body", err) + writeErr = err + return + } + w.Flush() + + }, func(header *fasthttp.RequestHeader) { + header.Set("Content-Type", contentType) + for k, v := range pairMap { + header.Set(k, v) + } + if jwt != "" { + header.Set("Authorization", "BEARER "+string(jwt)) + } + }, func(resp *fasthttp.Response) error { + etagBytes := resp.Header.Peek("ETag") + lenEtagBytes := len(etagBytes) + if lenEtagBytes > 2 && etagBytes[0] == '"' && etagBytes[lenEtagBytes-1] == '"' { + etag = string(etagBytes[1 : len(etagBytes)-1]) + } + + unmarshal_err := json.Unmarshal(resp.Body(), &ret) + if unmarshal_err != nil { + glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp.Body())) + return unmarshal_err + } + if ret.Error != "" { + return errors.New(ret.Error) + } + return nil + }) + + if writeErr != nil { + return nil, writeErr } - ret.ETag = etag - return &ret, nil -} -func getEtag(r *http.Response) (etag string) { - etag = r.Header.Get("ETag") - if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") { - etag = etag[1 : len(etag)-1] + ret.ETag = etag + if err != nil { + return nil, err } - return + return &ret, nil } |
