aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api')
-rw-r--r--weed/s3api/auth_credentials.go8
-rw-r--r--weed/s3api/filer_util_tags.go104
-rw-r--r--weed/s3api/s3api_object_tagging_handlers.go117
-rw-r--r--weed/s3api/s3api_server.go17
-rw-r--r--weed/s3api/s3err/s3api_errors.go6
-rw-r--r--weed/s3api/stats.go27
-rw-r--r--weed/s3api/tags.go38
-rw-r--r--weed/s3api/tags_test.go50
8 files changed, 354 insertions, 13 deletions
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 31519e6e3..2b7666345 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -16,9 +16,11 @@ import (
type Action string
const (
- ACTION_READ = "Read"
- ACTION_WRITE = "Write"
- ACTION_ADMIN = "Admin"
+ ACTION_READ = "Read"
+ ACTION_WRITE = "Write"
+ ACTION_ADMIN = "Admin"
+ ACTION_TAGGING = "Tagging"
+ ACTION_LIST = "List"
)
type Iam interface {
diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go
new file mode 100644
index 000000000..3d4da7825
--- /dev/null
+++ b/weed/s3api/filer_util_tags.go
@@ -0,0 +1,104 @@
+package s3api
+
+import (
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+const (
+ S3TAG_PREFIX = "s3-"
+)
+
+func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
+
+ err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+ tags = make(map[string]string)
+ for k, v := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ tags[k[len(S3TAG_PREFIX):]] = string(v)
+ }
+ }
+ return nil
+ })
+ return
+}
+
+func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) {
+
+ return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+
+ for k, _ := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ delete(resp.Entry.Extended, k)
+ }
+ }
+
+ if resp.Entry.Extended == nil {
+ resp.Entry.Extended = make(map[string][]byte)
+ }
+ for k, v := range tags {
+ resp.Entry.Extended[S3TAG_PREFIX+k] = []byte(v)
+ }
+
+ return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: resp.Entry,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ })
+
+ })
+
+}
+
+func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) {
+
+ return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ })
+ if err != nil {
+ return err
+ }
+
+ hasDeletion := false
+ for k, _ := range resp.Entry.Extended {
+ if strings.HasPrefix(k, S3TAG_PREFIX) {
+ delete(resp.Entry.Extended, k)
+ hasDeletion = true
+ }
+ }
+
+ if !hasDeletion {
+ return nil
+ }
+
+ return filer_pb.UpdateEntry(client, &filer_pb.UpdateEntryRequest{
+ Directory: parentDirectoryPath,
+ Entry: resp.Entry,
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ })
+
+ })
+
+}
diff --git a/weed/s3api/s3api_object_tagging_handlers.go b/weed/s3api/s3api_object_tagging_handlers.go
new file mode 100644
index 000000000..94719834c
--- /dev/null
+++ b/weed/s3api/s3api_object_tagging_handlers.go
@@ -0,0 +1,117 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "io/ioutil"
+ "net/http"
+)
+
+// GetObjectTaggingHandler - GET object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
+func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ tags, err := s3a.getTags(dir, name)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ writeSuccessResponseXML(w, encodeResponse(FromTags(tags)))
+
+}
+
+// PutObjectTaggingHandler Put object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
+func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ tagging := &Tagging{}
+ input, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
+ if err != nil {
+ glog.Errorf("PutObjectTaggingHandler read input %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ return
+ }
+ if err = xml.Unmarshal(input, tagging); err != nil {
+ glog.Errorf("PutObjectTaggingHandler Unmarshal %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrMalformedXML, r.URL)
+ return
+ }
+ tags := tagging.ToTags()
+ if len(tags) > 10 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags))
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ for k, v := range tags {
+ if len(k) > 128 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k)
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ if len(v) > 256 {
+ glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v)
+ writeErrorResponse(w, s3err.ErrInvalidTag, r.URL)
+ return
+ }
+ }
+
+ if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// DeleteObjectTaggingHandler Delete object tagging
+// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
+func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ bucket, object := getBucketAndObject(r)
+
+ target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
+ dir, name := target.DirAndName()
+
+ err := s3a.rmTags(dir, name)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
+ } else {
+ glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
+ writeErrorResponse(w, s3err.ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 5ddfdafd0..1ab80c3ee 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -64,9 +64,16 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// AbortMultipartUpload
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}")
// ListObjectParts
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_WRITE), "GET")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}")
// ListMultipartUploads
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_WRITE), "GET")).Queries("uploads", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "")
+
+ // GetObjectTagging
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "")
+ // PutObjectTagging
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "")
+ // DeleteObjectTagging
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "")
// CopyObject
bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
@@ -81,11 +88,11 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE"))
// ListObjectsV2
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_READ), "LIST")).Queries("list-type", "2")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2")
// GetObject, but directory listing is not supported
bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET"))
// ListObjectsV1 (Legacy)
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_READ), "LIST"))
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST"))
// PostPolicy
bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST"))
@@ -112,7 +119,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
}
// ListBuckets
- apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_READ), "LIST"))
+ apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN), "LIST"))
// NotFound
apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler)
diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go
index cccef0227..f95652afb 100644
--- a/weed/s3api/s3err/s3api_errors.go
+++ b/weed/s3api/s3err/s3api_errors.go
@@ -61,6 +61,7 @@ const (
ErrInternalError
ErrInvalidCopyDest
ErrInvalidCopySource
+ ErrInvalidTag
ErrAuthHeaderEmpty
ErrSignatureVersionNotSupported
ErrMalformedPOSTRequest
@@ -188,6 +189,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Copy Source must mention the source bucket and key: sourcebucket/sourcekey.",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrInvalidTag: {
+ Code: "InvalidArgument",
+ Description: "The Tag value you have provided is invalid",
+ HTTPStatusCode: http.StatusBadRequest,
+ },
ErrMalformedXML: {
Code: "MalformedXML",
Description: "The XML you provided was not well-formed or did not validate against our published schema.",
diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go
index 16a546c66..b667b32a0 100644
--- a/weed/s3api/stats.go
+++ b/weed/s3api/stats.go
@@ -4,18 +4,35 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
+ "strconv"
"time"
)
-func track(f http.HandlerFunc, action string) http.HandlerFunc {
+type StatusRecorder struct {
+ http.ResponseWriter
+ Status int
+}
- return func(w http.ResponseWriter, r *http.Request) {
+func NewStatusResponseWriter(w http.ResponseWriter) *StatusRecorder {
+ return &StatusRecorder{w, http.StatusOK}
+}
- w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION)
+func (r *StatusRecorder) WriteHeader(status int) {
+ r.Status = status
+ r.ResponseWriter.WriteHeader(status)
+}
+func (r *StatusRecorder) Flush() {
+ r.ResponseWriter.(http.Flusher).Flush()
+}
+
+func track(f http.HandlerFunc, action string) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION)
+ recorder := NewStatusResponseWriter(w)
start := time.Now()
- stats_collect.S3RequestCounter.WithLabelValues(action).Inc()
- f(w, r)
+ f(recorder, r)
stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds())
+ stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc()
}
}
diff --git a/weed/s3api/tags.go b/weed/s3api/tags.go
new file mode 100644
index 000000000..9ff7d1fba
--- /dev/null
+++ b/weed/s3api/tags.go
@@ -0,0 +1,38 @@
+package s3api
+
+import (
+ "encoding/xml"
+)
+
+type Tag struct {
+ Key string `xml:"Key"`
+ Value string `xml:"Value"`
+}
+
+type TagSet struct {
+ Tag []Tag `xml:"Tag"`
+}
+
+type Tagging struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"`
+ TagSet TagSet `xml:"TagSet"`
+}
+
+func (t *Tagging) ToTags() map[string]string {
+ output := make(map[string]string)
+ for _, tag := range t.TagSet.Tag {
+ output[tag.Key] = tag.Value
+ }
+ return output
+}
+
+func FromTags(tags map[string]string) (t *Tagging) {
+ t = &Tagging{}
+ for k, v := range tags {
+ t.TagSet.Tag = append(t.TagSet.Tag, Tag{
+ Key: k,
+ Value: v,
+ })
+ }
+ return
+}
diff --git a/weed/s3api/tags_test.go b/weed/s3api/tags_test.go
new file mode 100644
index 000000000..887843d6f
--- /dev/null
+++ b/weed/s3api/tags_test.go
@@ -0,0 +1,50 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestXMLUnmarshall(t *testing.T) {
+
+ input := `<?xml version="1.0" encoding="UTF-8"?>
+<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <TagSet>
+ <Tag>
+ <Key>key1</Key>
+ <Value>value1</Value>
+ </Tag>
+ </TagSet>
+</Tagging>
+`
+
+ tags := &Tagging{}
+
+ xml.Unmarshal([]byte(input), tags)
+
+ assert.Equal(t, len(tags.TagSet.Tag), 1)
+ assert.Equal(t, tags.TagSet.Tag[0].Key, "key1")
+ assert.Equal(t, tags.TagSet.Tag[0].Value, "value1")
+
+}
+
+func TestXMLMarshall(t *testing.T) {
+ tags := &Tagging{
+ TagSet: TagSet{
+ []Tag{
+ {
+ Key: "key1",
+ Value: "value1",
+ },
+ },
+ },
+ }
+
+ actual := string(encodeResponse(tags))
+
+ expected := `<?xml version="1.0" encoding="UTF-8"?>
+<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><TagSet><Tag><Key>key1</Key><Value>value1</Value></Tag></TagSet></Tagging>`
+ assert.Equal(t, expected, actual)
+
+}