aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
-rw-r--r--weed/s3api/s3api_object_handlers.go303
1 files changed, 245 insertions, 58 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 44e93d297..f1a539ac5 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -3,15 +3,24 @@ package s3api
import (
"crypto/md5"
"encoding/json"
+ "encoding/xml"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"io"
"io/ioutil"
"net/http"
+ "net/url"
+ "sort"
"strings"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+
"github.com/gorilla/mux"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var (
@@ -20,6 +29,7 @@ var (
func init() {
client = &http.Client{Transport: &http.Transport{
+ MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
@@ -28,50 +38,73 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
- vars := mux.Vars(r)
- bucket := vars["bucket"]
- object := getObject(vars)
+ bucket, object := getBucketAndObject(r)
_, err := validateContentMd5(r.Header)
if err != nil {
- writeErrorResponse(w, ErrInvalidDigest, r.URL)
+ writeErrorResponse(w, s3err.ErrInvalidDigest, r.URL)
return
}
- rAuthType := getRequestAuthType(r)
dataReader := r.Body
- if rAuthType == authTypeStreamingSigned {
- dataReader = newSignV4ChunkedReader(r)
+ if s3a.iam.isEnabled() {
+ rAuthType := getRequestAuthType(r)
+ var s3ErrCode s3err.ErrorCode
+ switch rAuthType {
+ case authTypeStreamingSigned:
+ dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
+ case authTypeSignedV2, authTypePresignedV2:
+ _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
+ case authTypePresigned, authTypeSigned:
+ _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
+ }
+ if s3ErrCode != s3err.ErrNone {
+ writeErrorResponse(w, s3ErrCode, r.URL)
+ return
+ }
}
+ defer dataReader.Close()
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
+ if strings.HasSuffix(object, "/") {
+ if err := s3a.mkdir(s3a.option.BucketsPath, bucket+object, nil); err != nil {
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ return
+ }
+ } else {
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
- etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
- if errCode != ErrNone {
- writeErrorResponse(w, errCode, r.URL)
- return
- }
+ if errCode != s3err.ErrNone {
+ writeErrorResponse(w, errCode, r.URL)
+ return
+ }
- setEtag(w, etag)
+ setEtag(w, etag)
+ }
writeSuccessResponseEmpty(w)
}
+func urlPathEscape(object string) string {
+ var escapedParts []string
+ for _, part := range strings.Split(object, "/") {
+ escapedParts = append(escapedParts, url.PathEscape(part))
+ }
+ return strings.Join(escapedParts, "/")
+}
+
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- bucket := vars["bucket"]
- object := getObject(vars)
+ bucket, object := getBucketAndObject(r)
if strings.HasSuffix(r.URL.Path, "/") {
- writeErrorResponse(w, ErrNotImplemented, r.URL)
+ writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
return
}
destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@@ -79,12 +112,10 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- bucket := vars["bucket"]
- object := getObject(vars)
+ bucket, object := getBucketAndObject(r)
destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
@@ -92,29 +123,152 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- bucket := vars["bucket"]
- object := getObject(vars)
+ bucket, object := getBucketAndObject(r)
- destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
- s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) {
- for k, v := range proxyResonse.Header {
+ s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) {
+ for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
w.WriteHeader(http.StatusNoContent)
})
+}
+
+// / ObjectIdentifier carries key name for the object to delete.
+type ObjectIdentifier struct {
+ ObjectName string `xml:"Key"`
+}
+
+// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
+type DeleteObjectsRequest struct {
+ // Element to enable quiet mode for the request
+ Quiet bool
+ // List of objects to be deleted
+ Objects []ObjectIdentifier `xml:"Object"`
+}
+
+// DeleteError structure.
+type DeleteError struct {
+ Code string
+ Message string
+ Key string
+}
+
+// DeleteObjectsResponse container for multiple object deletes.
+type DeleteObjectsResponse struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
+ // Collection of all deleted objects
+ DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
+
+ // Collection of errors deleting certain objects.
+ Errors []DeleteError `xml:"Error,omitempty"`
}
// DeleteMultipleObjectsHandler - Delete multiple objects
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
- // TODO
- writeErrorResponse(w, ErrNotImplemented, r.URL)
+
+ bucket, _ := getBucketAndObject(r)
+
+ deleteXMLBytes, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ return
+ }
+
+ deleteObjects := &DeleteObjectsRequest{}
+ if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
+ writeErrorResponse(w, s3err.ErrMalformedXML, r.URL)
+ return
+ }
+
+ var deletedObjects []ObjectIdentifier
+ var deleteErrors []DeleteError
+
+ directoriesWithDeletion := make(map[string]int)
+
+ s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ // delete file entries
+ for _, object := range deleteObjects.Objects {
+
+ lastSeparator := strings.LastIndex(object.ObjectName, "/")
+ parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
+ if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
+ entryName = object.ObjectName[lastSeparator+1:]
+ parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
+ }
+ parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
+
+ err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
+ if err == nil {
+ directoriesWithDeletion[parentDirectoryPath]++
+ deletedObjects = append(deletedObjects, object)
+ } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
+ deletedObjects = append(deletedObjects, object)
+ } else {
+ delete(directoriesWithDeletion, parentDirectoryPath)
+ deleteErrors = append(deleteErrors, DeleteError{
+ Code: "",
+ Message: err.Error(),
+ Key: object.ObjectName,
+ })
+ }
+ }
+
+ // purge empty folders, only checking folders with deletions
+ for len(directoriesWithDeletion) > 0 {
+ directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
+ }
+
+ return nil
+ })
+
+ deleteResp := DeleteObjectsResponse{}
+ if !deleteObjects.Quiet {
+ deleteResp.DeletedObjects = deletedObjects
+ }
+ deleteResp.Errors = deleteErrors
+
+ writeSuccessResponseXML(w, encodeResponse(deleteResp))
+
}
-func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
+func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
+ var allDirs []string
+ for dir, _ := range directoriesWithDeletion {
+ allDirs = append(allDirs, dir)
+ }
+ sort.Slice(allDirs, func(i, j int) bool {
+ return len(allDirs[i]) > len(allDirs[j])
+ })
+ newDirectoriesWithDeletion = make(map[string]int)
+ for _, dir := range allDirs {
+ parentDir, dirName := util.FullPath(dir).DirAndName()
+ if parentDir == s3a.option.BucketsPath {
+ continue
+ }
+ if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
+ glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
+ } else {
+ newDirectoriesWithDeletion[parentDir]++
+ }
+ }
+ return
+}
+
+var passThroughHeaders = []string{
+ "response-cache-control",
+ "response-content-disposition",
+ "response-content-encoding",
+ "response-content-language",
+ "response-content-type",
+ "response-expires",
+}
+
+func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) {
glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
@@ -122,15 +276,27 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
if err != nil {
glog.Errorf("NewRequest %s: %v", destUrl, err)
- writeErrorResponse(w, ErrInternalError, r.URL)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return
}
proxyReq.Header.Set("Host", s3a.option.Filer)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- proxyReq.Header.Set("Etag-MD5", "True")
for header, values := range r.Header {
+ // handle s3 related headers
+ passed := false
+ for _, h := range passThroughHeaders {
+ if strings.ToLower(header) == h && len(values) > 0 {
+ proxyReq.Header.Add(header[len("response-"):], values[0])
+ passed = true
+ break
+ }
+ }
+ if passed {
+ continue
+ }
+ // handle other headers
for _, value := range values {
proxyReq.Header.Add(header, value)
}
@@ -140,31 +306,44 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
- writeErrorResponse(w, ErrInternalError, r.URL)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
return
}
- defer resp.Body.Close()
+ defer util.CloseResponse(resp)
+
+ if (resp.ContentLength == -1 || resp.StatusCode == 404) && resp.StatusCode != 304 {
+ if r.Method != "DELETE" {
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ return
+ }
+ }
responseFn(resp, w)
+
}
-func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
- for k, v := range proxyResonse.Header {
+
+func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) {
+ for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
- w.WriteHeader(proxyResonse.StatusCode)
- io.Copy(w, proxyResonse.Body)
+ if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
+ w.WriteHeader(http.StatusPartialContent)
+ } else {
+ w.WriteHeader(proxyResponse.StatusCode)
+ }
+ io.Copy(w, proxyResponse.Body)
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code s3err.ErrorCode) {
hash := md5.New()
- var body io.Reader = io.TeeReader(dataReader, hash)
+ var body = io.TeeReader(dataReader, hash)
proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
if err != nil {
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
- return "", ErrInternalError
+ return "", s3err.ErrInternalError
}
proxyReq.Header.Set("Host", s3a.option.Filer)
@@ -178,11 +357,9 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
resp, postErr := client.Do(proxyReq)
- dataReader.Close()
-
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
- return "", ErrInternalError
+ return "", s3err.ErrInternalError
}
defer resp.Body.Close()
@@ -190,21 +367,21 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- glog.Errorf("upload to filer response read: %v", ra_err)
- return etag, ErrInternalError
+ glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
+ return etag, s3err.ErrInternalError
}
var ret weed_server.FilerPostResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
- return "", ErrInternalError
+ return "", s3err.ErrInternalError
}
if ret.Error != "" {
glog.Errorf("upload to filer error: %v", ret.Error)
- return "", ErrInternalError
+ return "", filerErrorToS3Error(ret.Error)
}
- return etag, ErrNone
+ return etag, s3err.ErrNone
}
func setEtag(w http.ResponseWriter, etag string) {
@@ -217,10 +394,20 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
-func getObject(vars map[string]string) string {
- object := vars["object"]
+func getBucketAndObject(r *http.Request) (bucket, object string) {
+ vars := mux.Vars(r)
+ bucket = vars["bucket"]
+ object = vars["object"]
if !strings.HasPrefix(object, "/") {
object = "/" + object
}
- return object
+
+ return
+}
+
+func filerErrorToS3Error(errString string) s3err.ErrorCode {
+ if strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory") {
+ return s3err.ErrExistingObjectIsDirectory
+ }
+ return s3err.ErrInternalError
}