diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 117 |
1 files changed, 105 insertions, 12 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 44e93d297..9d03cdbe3 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -3,15 +3,18 @@ package s3api import ( "crypto/md5" "encoding/json" + "encoding/xml" "fmt" "io" "io/ioutil" "net/http" "strings" + "github.com/gorilla/mux" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/server" - "github.com/gorilla/mux" + "github.com/chrislusf/seaweedfs/weed/util" ) var ( @@ -40,12 +43,17 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) rAuthType := getRequestAuthType(r) dataReader := r.Body + var s3ErrCode ErrorCode if rAuthType == authTypeStreamingSigned { - dataReader = newSignV4ChunkedReader(r) + dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r) } + if s3ErrCode != ErrNone { + writeErrorResponse(w, s3ErrCode, r.URL) + return + } + defer dataReader.Close() - uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s", - s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket) + uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object) etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader) @@ -108,10 +116,97 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque } +/// ObjectIdentifier carries key name for the object to delete. +type ObjectIdentifier struct { + ObjectName string `xml:"Key"` +} + +// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. +type DeleteObjectsRequest struct { + // Element to enable quiet mode for the request + Quiet bool + // List of objects to be deleted + Objects []ObjectIdentifier `xml:"Object"` +} + +// DeleteError structure. +type DeleteError struct { + Code string + Message string + Key string +} + +// DeleteObjectsResponse container for multiple object deletes. +type DeleteObjectsResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"` + + // Collection of all deleted objects + DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"` + + // Collection of errors deleting certain objects. + Errors []DeleteError `xml:"Error,omitempty"` +} + // DeleteMultipleObjectsHandler - Delete multiple objects func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { - // TODO - writeErrorResponse(w, ErrNotImplemented, r.URL) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + deleteXMLBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + + deleteObjects := &DeleteObjectsRequest{} + if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil { + writeErrorResponse(w, ErrMalformedXML, r.URL) + return + } + + var index int + + var deletedObjects []ObjectIdentifier + var deleteErrors []DeleteError + s3a.streamRemove(deleteObjects.Quiet, func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool) { + if index >= len(deleteObjects.Objects) { + finished = true + return + } + + object := deleteObjects.Objects[index] + + lastSeparator := strings.LastIndex(object.ObjectName, "/") + parentDirectoryPath, entryName, isDeleteData, isRecursive = "/", object.ObjectName, true, false + if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) { + entryName = object.ObjectName[lastSeparator+1:] + parentDirectoryPath = "/" + object.ObjectName[:lastSeparator] + } + parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) + return + }, func(err string) { + object := deleteObjects.Objects[index] + if err == "" { + deletedObjects = append(deletedObjects, object) + } else { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err, + Key: object.ObjectName, + }) + } + index++ + }) + + deleteResp := DeleteObjectsResponse{} + if !deleteObjects.Quiet { + deleteResp.DeletedObjects = deletedObjects + } + deleteResp.Errors = deleteErrors + + writeSuccessResponseXML(w, encodeResponse(deleteResp)) + } func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) { @@ -128,7 +223,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des proxyReq.Header.Set("Host", s3a.option.Filer) proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - proxyReq.Header.Set("Etag-MD5", "True") for header, values := range r.Header { for _, value := range values { @@ -143,9 +237,10 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des writeErrorResponse(w, ErrInternalError, r.URL) return } - defer resp.Body.Close() + defer util.CloseResponse(resp) responseFn(resp, w) + } func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) { for k, v := range proxyResonse.Header { @@ -155,10 +250,10 @@ func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) { io.Copy(w, proxyResonse.Body) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code ErrorCode) { hash := md5.New() - var body io.Reader = io.TeeReader(dataReader, hash) + var body = io.TeeReader(dataReader, hash) proxyReq, err := http.NewRequest("PUT", uploadUrl, body) @@ -178,8 +273,6 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader resp, postErr := client.Do(proxyReq) - dataReader.Close() - if postErr != nil { glog.Errorf("post to filer: %v", postErr) return "", ErrInternalError |
