diff options
Diffstat (limited to 'weed/s3api')
| -rw-r--r-- | weed/s3api/filer_util.go | 55 | ||||
| -rw-r--r-- | weed/s3api/s3api_errors.go | 7 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 93 |
3 files changed, 153 insertions, 2 deletions
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 91c34f0eb..3c11b032c 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -139,6 +139,61 @@ func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entr } +func (s3a *S3ApiServer) streamRemove(ctx context.Context, quiet bool, + fn func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool), + respFn func(err string)) error { + + return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + stream, err := client.StreamDeleteEntries(ctx) + if err != nil { + glog.V(0).Infof("stream delete entry: %v", err) + return fmt.Errorf("stream delete entry: %v", err) + } + + waitc := make(chan struct{}) + go func() { + for { + resp, err := stream.Recv() + if err == io.EOF { + // read done. + close(waitc) + return + } + if err != nil { + glog.V(0).Infof("streamRemove: %v", err) + return + } + respFn(resp.Error) + } + }() + + for { + finished, parentDirectoryPath, entryName, isDeleteData, isRecursive := fn() + if finished { + break + } + err = stream.Send(&filer_pb.DeleteEntryRequest{ + Directory: parentDirectoryPath, + Name: entryName, + IsDeleteData: isDeleteData, + IsRecursive: isRecursive, + IgnoreRecursiveError: quiet, + }) + if err != nil { + glog.V(0).Infof("streamRemove: %v", err) + break + } + + } + stream.CloseSend() + <-waitc + return err + + }) + +} + func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) { err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/s3api/s3api_errors.go b/weed/s3api/s3api_errors.go index e0f89c2b0..3f97c73cb 100644 --- a/weed/s3api/s3api_errors.go +++ b/weed/s3api/s3api_errors.go @@ -49,6 +49,7 @@ const ( ErrMissingFields ErrMissingCredTag ErrCredMalformed + ErrMalformedXML ErrMalformedDate ErrMalformedPresignedDate ErrMalformedCredentialDate @@ -161,6 +162,12 @@ var errorCodeResponse = map[ErrorCode]APIError{ HTTPStatusCode: http.StatusBadRequest, }, + ErrMalformedXML: { + Code: "MalformedXML", + Description: "The XML you provided was not well-formed or did not validate against our published schema.", + HTTPStatusCode: http.StatusBadRequest, + }, + ErrAuthHeaderEmpty: { Code: "InvalidArgument", Description: "Authorization header is invalid -- one and only one ' ' (space) required.", diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 864376d60..b7bdf334a 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1,8 +1,10 @@ package s3api import ( + "context" "crypto/md5" "encoding/json" + "encoding/xml" "fmt" "io" "io/ioutil" @@ -115,10 +117,97 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque } +/// ObjectIdentifier carries key name for the object to delete. +type ObjectIdentifier struct { + ObjectName string `xml:"Key"` +} + +// DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. +type DeleteObjectsRequest struct { + // Element to enable quiet mode for the request + Quiet bool + // List of objects to be deleted + Objects []ObjectIdentifier `xml:"Object"` +} + +// DeleteError structure. +type DeleteError struct { + Code string + Message string + Key string +} + +// DeleteObjectsResponse container for multiple object deletes. +type DeleteObjectsResponse struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"` + + // Collection of all deleted objects + DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"` + + // Collection of errors deleting certain objects. + Errors []DeleteError `xml:"Error,omitempty"` +} + // DeleteMultipleObjectsHandler - Delete multiple objects func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) { - // TODO - writeErrorResponse(w, ErrNotImplemented, r.URL) + + vars := mux.Vars(r) + bucket := vars["bucket"] + + deleteXMLBytes, err := ioutil.ReadAll(r.Body) + if err != nil { + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + + deleteObjects := &DeleteObjectsRequest{} + if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil { + writeErrorResponse(w, ErrMalformedXML, r.URL) + return + } + + var index int + + var deletedObjects []ObjectIdentifier + var deleteErrors []DeleteError + s3a.streamRemove(context.Background(), deleteObjects.Quiet, func() (finished bool, parentDirectoryPath string, entryName string, isDeleteData, isRecursive bool) { + if index >= len(deleteObjects.Objects) { + finished = true + return + } + + object := deleteObjects.Objects[index] + + lastSeparator := strings.LastIndex(object.ObjectName, "/") + parentDirectoryPath, entryName, isDeleteData, isRecursive = "/", object.ObjectName, true, false + if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) { + entryName = object.ObjectName[lastSeparator+1:] + parentDirectoryPath = "/" + object.ObjectName[:lastSeparator] + } + parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath) + return + }, func(err string) { + object := deleteObjects.Objects[index] + if err == "" { + deletedObjects = append(deletedObjects, object) + } else { + deleteErrors = append(deleteErrors, DeleteError{ + Code: "", + Message: err, + Key: object.ObjectName, + }) + } + index++ + }) + + deleteResp := DeleteObjectsResponse{} + if !deleteObjects.Quiet { + deleteResp.DeletedObjects = deletedObjects + } + deleteResp.Errors = deleteErrors + + writeSuccessResponseXML(w, encodeResponse(deleteResp)) + } func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) { |
