aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go207
1 files changed, 207 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
new file mode 100644
index 000000000..49d385afc
--- /dev/null
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -0,0 +1,207 @@
+package s3api
+
+import (
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/pquerna/cachecontrol/cacheobject"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ weed_server "github.com/seaweedfs/seaweedfs/weed/server"
+)
+
+func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
+
+ // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
+
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
+
+ _, err := validateContentMd5(r.Header)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
+ return
+ }
+
+ if r.Header.Get("Cache-Control") != "" {
+ if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
+ return
+ }
+ }
+
+ if r.Header.Get("Expires") != "" {
+ if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedDate)
+ return
+ }
+ }
+
+ dataReader := r.Body
+ rAuthType := getRequestAuthType(r)
+ if s3a.iam.isEnabled() {
+ var s3ErrCode s3err.ErrorCode
+ switch rAuthType {
+ case authTypeStreamingSigned:
+ dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
+ case authTypeSignedV2, authTypePresignedV2:
+ _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
+ case authTypePresigned, authTypeSigned:
+ _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
+ }
+ if s3ErrCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, s3ErrCode)
+ return
+ }
+ } else {
+ if authTypeStreamingSigned == rAuthType {
+ s3err.WriteErrorResponse(w, r, s3err.ErrAuthNotSetup)
+ return
+ }
+ }
+ defer dataReader.Close()
+
+ objectContentType := r.Header.Get("Content-Type")
+ if strings.HasSuffix(object, "/") && r.ContentLength <= 1024 {
+ if err := s3a.mkdir(
+ s3a.option.BucketsPath, bucket+strings.TrimSuffix(object, "/"),
+ func(entry *filer_pb.Entry) {
+ if objectContentType == "" {
+ objectContentType = s3_constants.FolderMimeType
+ }
+ if r.ContentLength > 0 {
+ entry.Content, _ = io.ReadAll(r.Body)
+ }
+ entry.Attributes.Mime = objectContentType
+ }); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ } else {
+ uploadUrl := s3a.toFilerUrl(bucket, object)
+ if objectContentType == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
+
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
+
+ setEtag(w, etag)
+ }
+
+ writeSuccessResponseEmpty(w, r)
+}
+
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string) (etag string, code s3err.ErrorCode) {
+
+ hash := md5.New()
+ var body = io.TeeReader(dataReader, hash)
+
+ proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
+
+ if err != nil {
+ glog.Errorf("NewRequest %s: %v", uploadUrl, err)
+ return "", s3err.ErrInternalError
+ }
+
+ proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+ if destination != "" {
+ proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
+ }
+
+ if s3a.option.FilerGroup != "" {
+ query := proxyReq.URL.Query()
+ query.Add("collection", s3a.getCollectionName(bucket))
+ proxyReq.URL.RawQuery = query.Encode()
+ }
+
+ for header, values := range r.Header {
+ for _, value := range values {
+ proxyReq.Header.Add(header, value)
+ }
+ }
+ // ensure that the Authorization header is overriding any previous
+ // Authorization header which might be already present in proxyReq
+ s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
+ resp, postErr := s3a.client.Do(proxyReq)
+
+ if postErr != nil {
+ glog.Errorf("post to filer: %v", postErr)
+ return "", s3err.ErrInternalError
+ }
+ defer resp.Body.Close()
+
+ etag = fmt.Sprintf("%x", hash.Sum(nil))
+
+ resp_body, ra_err := io.ReadAll(resp.Body)
+ if ra_err != nil {
+ glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
+ return etag, s3err.ErrInternalError
+ }
+ var ret weed_server.FilerPostResult
+ unmarshal_err := json.Unmarshal(resp_body, &ret)
+ if unmarshal_err != nil {
+ glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
+ return "", s3err.ErrInternalError
+ }
+ if ret.Error != "" {
+ glog.Errorf("upload to filer error: %v", ret.Error)
+ return "", filerErrorToS3Error(ret.Error)
+ }
+
+ return etag, s3err.ErrNone
+}
+
+func setEtag(w http.ResponseWriter, etag string) {
+ if etag != "" {
+ if strings.HasPrefix(etag, "\"") {
+ w.Header()["ETag"] = []string{etag}
+ } else {
+ w.Header()["ETag"] = []string{"\"" + etag + "\""}
+ }
+ }
+}
+
+func filerErrorToS3Error(errString string) s3err.ErrorCode {
+ switch {
+ case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"):
+ return s3err.ErrExistingObjectIsDirectory
+ case strings.HasSuffix(errString, "is a file"):
+ return s3err.ErrExistingObjectIsFile
+ default:
+ return s3err.ErrInternalError
+ }
+}
+
+func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
+ encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
+
+ if encodedJwt == "" {
+ return
+ }
+
+ r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
+}
+
+func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
+ var encodedJwt security.EncodedJwt
+ if isWrite {
+ encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
+ } else {
+ encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
+ }
+ return string(encodedJwt)
+}