aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
-rw-r--r--weed/s3api/s3api_object_handlers.go117
1 files changed, 105 insertions, 12 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 44e93d297..9d03cdbe3 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -3,15 +3,18 @@ package s3api
import (
"crypto/md5"
"encoding/json"
+ "encoding/xml"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
+ "github.com/gorilla/mux"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
- "github.com/gorilla/mux"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var (
@@ -40,12 +43,17 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
rAuthType := getRequestAuthType(r)
dataReader := r.Body
+ var s3ErrCode ErrorCode
if rAuthType == authTypeStreamingSigned {
- dataReader = newSignV4ChunkedReader(r)
+ dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
}
+ if s3ErrCode != ErrNone {
+ writeErrorResponse(w, s3ErrCode, r.URL)
+ return
+ }
+ defer dataReader.Close()
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
@@ -108,10 +116,97 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
}
+/// ObjectIdentifier carries key name for the object to delete.
+type ObjectIdentifier struct {
+ ObjectName string `xml:"Key"`
+}
+
+// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
+type DeleteObjectsRequest struct {
+ // Element to enable quiet mode for the request
+ Quiet bool
+ // List of objects to be deleted
+ Objects []ObjectIdentifier `xml:"Object"`
+}
+
+// DeleteError structure.
+type DeleteError struct {
+ Code string
+ Message string
+ Key string
+}
+
+// DeleteObjectsResponse container for multiple object deletes.
+type DeleteObjectsResponse struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
+
+ // Collection of all deleted objects
+ DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
+
+ // Collection of errors deleting certain objects.
+ Errors []DeleteError `xml:"Error,omitempty"`
+}
+
// DeleteMultipleObjectsHandler - Delete multiple objects
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
- // TODO
- writeErrorResponse(w, ErrNotImplemented, r.URL)
+
+ vars := mux.Vars(r)
+ bucket := vars["bucket"]
+
+ deleteXMLBytes, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+
+ deleteObjects := &DeleteObjectsRequest{}
+ if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
+ writeErrorResponse(w, ErrMalformedXML, r.URL)
+ return
+ }
+
+ var index int
+
+ var deletedObjects []ObjectIdentifier
+ var deleteErrors []DeleteError
+ s3a.streamRemove(deleteObjects.Quiet, func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool) {
+ if index >= len(deleteObjects.Objects) {
+ finished = true
+ return
+ }
+
+ object := deleteObjects.Objects[index]
+
+ lastSeparator := strings.LastIndex(object.ObjectName, "/")
+ parentDirectoryPath, entryName, isDeleteData, isRecursive = "/", object.ObjectName, true, false
+ if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
+ entryName = object.ObjectName[lastSeparator+1:]
+ parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
+ }
+ parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
+ return
+ }, func(err string) {
+ object := deleteObjects.Objects[index]
+ if err == "" {
+ deletedObjects = append(deletedObjects, object)
+ } else {
+ deleteErrors = append(deleteErrors, DeleteError{
+ Code: "",
+ Message: err,
+ Key: object.ObjectName,
+ })
+ }
+ index++
+ })
+
+ deleteResp := DeleteObjectsResponse{}
+ if !deleteObjects.Quiet {
+ deleteResp.DeletedObjects = deletedObjects
+ }
+ deleteResp.Errors = deleteErrors
+
+ writeSuccessResponseXML(w, encodeResponse(deleteResp))
+
}
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
@@ -128,7 +223,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
proxyReq.Header.Set("Host", s3a.option.Filer)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- proxyReq.Header.Set("Etag-MD5", "True")
for header, values := range r.Header {
for _, value := range values {
@@ -143,9 +237,10 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
- defer resp.Body.Close()
+ defer util.CloseResponse(resp)
responseFn(resp, w)
+
}
func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
for k, v := range proxyResonse.Header {
@@ -155,10 +250,10 @@ func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
io.Copy(w, proxyResonse.Body)
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code ErrorCode) {
hash := md5.New()
- var body io.Reader = io.TeeReader(dataReader, hash)
+ var body = io.TeeReader(dataReader, hash)
proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
@@ -178,8 +273,6 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
resp, postErr := client.Do(proxyReq)
- dataReader.Close()
-
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
return "", ErrInternalError