diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 303 |
1 files changed, 245 insertions, 58 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 44e93d297..f1a539ac5 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -3,15 +3,24 @@ package s3api import ( "crypto/md5" "encoding/json" + "encoding/xml" "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "io" "io/ioutil" "net/http" + "net/url" + "sort" "strings" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/gorilla/mux" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_server "github.com/chrislusf/seaweedfs/weed/server" + "github.com/chrislusf/seaweedfs/weed/util" ) var ( @@ -20,6 +29,7 @@ var ( func init() { client = &http.Client{Transport: &http.Transport{ + MaxIdleConns: 1024, MaxIdleConnsPerHost: 1024, }} } @@ -28,50 +38,73 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html - vars := mux.Vars(r) - bucket := vars["bucket"] - object := getObject(vars) + bucket, object := getBucketAndObject(r) _, err := validateContentMd5(r.Header) if err != nil { - writeErrorResponse(w, ErrInvalidDigest, r.URL) + writeErrorResponse(w, s3err.ErrInvalidDigest, r.URL) return } - rAuthType := getRequestAuthType(r) dataReader := r.Body - if rAuthType == authTypeStreamingSigned { - dataReader = newSignV4ChunkedReader(r) + if s3a.iam.isEnabled() { + rAuthType := getRequestAuthType(r) + var s3ErrCode s3err.ErrorCode + switch rAuthType { + case authTypeStreamingSigned: + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) + case authTypeSignedV2, authTypePresignedV2: + _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r) + case authTypePresigned, authTypeSigned: + _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r) + } + if s3ErrCode != s3err.ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } } + defer dataReader.Close() - uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket) + if strings.HasSuffix(object, "/") { + if err := s3a.mkdir(s3a.option.BucketsPath, bucket+object, nil); err != nil { + writeErrorResponse(w, s3err.ErrInternalError, r.URL) + return + } + } else { + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) - if errCode != ErrNone { - writeErrorResponse(w, errCode, r.URL) - return - } + if errCode != s3err.ErrNone { + writeErrorResponse(w, errCode, r.URL) + return + } - setEtag(w, etag) + setEtag(w, etag) + } writeSuccessResponseEmpty(w) } +func urlPathEscape(object string) string { + var escapedParts []string + for _, part := range strings.Split(object, "/") { + escapedParts = append(escapedParts, url.PathEscape(part)) + } + return strings.Join(escapedParts, "/") +} + func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - bucket := vars["bucket"] - object := getObject(vars) + bucket, object := getBucketAndObject(r) if strings.HasSuffix(r.URL.Path, "/") { - writeErrorResponse(w, ErrNotImplemented, r.URL) + writeErrorResponse(w, s3err.ErrNotImplemented, r.URL) return } destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.proxyToFiler(w, r, destUrl, passThroughResponse) @@ -79,12 +112,10 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - bucket := vars["bucket"] - object := getObject(vars) + bucket, object := getBucketAndObject(r) destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) s3a.proxyToFiler(w, r, destUrl, passThroughResponse) @@ -92,29 +123,152 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - bucket := vars["bucket"] - object := getObject(vars) + bucket, object := getBucketAndObject(r) - destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true", + s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object)) - s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) { - for k, v := range proxyResonse.Header { + s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) { + for k, v := range proxyResponse.Header { w.Header()[k] = v } w.WriteHeader(http.StatusNoContent) }) +} + +// / ObjectIdentifier carries key name for the object to delete. +type ObjectIdentifier struct { + ObjectName string `xml:"Key"` +} + +// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. +type DeleteObjectsRequest struct { + // Element to enable quiet mode for the request + Quiet bool + // List of objects to be deleted + Objects []ObjectIdentifier `xml:"Object"` +} + +// DeleteError structure. +type DeleteError struct { + Code string + Message string + Key string +} + +// DeleteObjectsResponse container for multiple object deletes. +type DeleteObjectsResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"` + // Collection of all deleted objects + DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"` + + // Collection of errors deleting certain objects. + Errors []DeleteError `xml:"Error,omitempty"` } // DeleteMultipleObjectsHandler - Delete multiple objects func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { - // TODO - writeErrorResponse(w, ErrNotImplemented, r.URL) + + bucket, _ := getBucketAndObject(r) + + deleteXMLBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + writeErrorResponse(w, s3err.ErrInternalError, r.URL) + return + } + + deleteObjects := &DeleteObjectsRequest{} + if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil { + writeErrorResponse(w, s3err.ErrMalformedXML, r.URL) + return + } + + var deletedObjects []ObjectIdentifier + var deleteErrors []DeleteError + + directoriesWithDeletion := make(map[string]int) + + s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + // delete file entries + for _, object := range deleteObjects.Objects { + + lastSeparator := strings.LastIndex(object.ObjectName, "/") + parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false + if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) { + entryName = object.ObjectName[lastSeparator+1:] + parentDirectoryPath = "/" + object.ObjectName[:lastSeparator] + } + parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) + + err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) + if err == nil { + directoriesWithDeletion[parentDirectoryPath]++ + deletedObjects = append(deletedObjects, object) + } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { + deletedObjects = append(deletedObjects, object) + } else { + delete(directoriesWithDeletion, parentDirectoryPath) + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err.Error(), + Key: object.ObjectName, + }) + } + } + + // purge empty folders, only checking folders with deletions + for len(directoriesWithDeletion) > 0 { + directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion) + } + + return nil + }) + + deleteResp := DeleteObjectsResponse{} + if !deleteObjects.Quiet { + deleteResp.DeletedObjects = deletedObjects + } + deleteResp.Errors = deleteErrors + + writeSuccessResponseXML(w, encodeResponse(deleteResp)) + } -func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) { +func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) { + var allDirs []string + for dir, _ := range directoriesWithDeletion { + allDirs = append(allDirs, dir) + } + sort.Slice(allDirs, func(i, j int) bool { + return len(allDirs[i]) > len(allDirs[j]) + }) + newDirectoriesWithDeletion = make(map[string]int) + for _, dir := range allDirs { + parentDir, dirName := util.FullPath(dir).DirAndName() + if parentDir == s3a.option.BucketsPath { + continue + } + if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil { + glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err) + } else { + newDirectoriesWithDeletion[parentDir]++ + } + } + return +} + +var passThroughHeaders = []string{ + "response-cache-control", + "response-content-disposition", + "response-content-encoding", + "response-content-language", + "response-content-type", + "response-expires", +} + +func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) { glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl) @@ -122,15 +276,27 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des if err != nil { glog.Errorf("NewRequest %s: %v", destUrl, err) - writeErrorResponse(w, ErrInternalError, r.URL) + writeErrorResponse(w, s3err.ErrInternalError, r.URL) return } proxyReq.Header.Set("Host", s3a.option.Filer) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - proxyReq.Header.Set("Etag-MD5", "True") for header, values := range r.Header { + // handle s3 related headers + passed := false + for _, h := range passThroughHeaders { + if strings.ToLower(header) == h && len(values) > 0 { + proxyReq.Header.Add(header[len("response-"):], values[0]) + passed = true + break + } + } + if passed { + continue + } + // handle other headers for _, value := range values { proxyReq.Header.Add(header, value) } @@ -140,31 +306,44 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des if postErr != nil { glog.Errorf("post to filer: %v", postErr) - writeErrorResponse(w, ErrInternalError, r.URL) + writeErrorResponse(w, s3err.ErrInternalError, r.URL) return } - defer resp.Body.Close() + defer util.CloseResponse(resp) + + if (resp.ContentLength == -1 || resp.StatusCode == 404) && resp.StatusCode != 304 { + if r.Method != "DELETE" { + writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL) + return + } + } responseFn(resp, w) + } -func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) { - for k, v := range proxyResonse.Header { + +func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) { + for k, v := range proxyResponse.Header { w.Header()[k] = v } - w.WriteHeader(proxyResonse.StatusCode) - io.Copy(w, proxyResonse.Body) + if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 { + w.WriteHeader(http.StatusPartialContent) + } else { + w.WriteHeader(proxyResponse.StatusCode) + } + io.Copy(w, proxyResponse.Body) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code s3err.ErrorCode) { hash := md5.New() - var body io.Reader = io.TeeReader(dataReader, hash) + var body = io.TeeReader(dataReader, hash) proxyReq, err := http.NewRequest("PUT", uploadUrl, body) if err != nil { glog.Errorf("NewRequest %s: %v", uploadUrl, err) - return "", ErrInternalError + return "", s3err.ErrInternalError } proxyReq.Header.Set("Host", s3a.option.Filer) @@ -178,11 +357,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader resp, postErr := client.Do(proxyReq) - dataReader.Close() - if postErr != nil { glog.Errorf("post to filer: %v", postErr) - return "", ErrInternalError + return "", s3err.ErrInternalError } defer resp.Body.Close() @@ -190,21 +367,21 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - glog.Errorf("upload to filer response read: %v", ra_err) - return etag, ErrInternalError + glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) + return etag, s3err.ErrInternalError } var ret weed_server.FilerPostResult unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) - return "", ErrInternalError + return "", s3err.ErrInternalError } if ret.Error != "" { glog.Errorf("upload to filer error: %v", ret.Error) - return "", ErrInternalError + return "", filerErrorToS3Error(ret.Error) } - return etag, ErrNone + return etag, s3err.ErrNone } func setEtag(w http.ResponseWriter, etag string) { @@ -217,10 +394,20 @@ func setEtag(w http.ResponseWriter, etag string) { } } -func getObject(vars map[string]string) string { - object := vars["object"] +func getBucketAndObject(r *http.Request) (bucket, object string) { + vars := mux.Vars(r) + bucket = vars["bucket"] + object = vars["object"] if !strings.HasPrefix(object, "/") { object = "/" + object } - return object + + return +} + +func filerErrorToS3Error(errString string) s3err.ErrorCode { + if strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory") { + return s3err.ErrExistingObjectIsDirectory + } + return s3err.ErrInternalError } |
