aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/s3api
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/s3api')
-rw-r--r--weed/s3api/auth_credentials.go120
-rw-r--r--weed/s3api/auth_credentials_subscribe.go42
-rw-r--r--weed/s3api/auth_credentials_test.go59
-rw-r--r--weed/s3api/auth_signature_v4.go13
-rw-r--r--weed/s3api/auto_signature_v4_test.go8
-rw-r--r--weed/s3api/chunked_reader_v4.go5
-rw-r--r--weed/s3api/filer_multipart.go79
-rw-r--r--weed/s3api/filer_multipart_test.go87
-rw-r--r--weed/s3api/filer_util.go11
-rw-r--r--weed/s3api/filer_util_tags.go10
-rw-r--r--weed/s3api/http/header.go36
-rw-r--r--weed/s3api/s3_constants/header.go66
-rw-r--r--weed/s3api/s3_constants/s3_actions.go2
-rw-r--r--weed/s3api/s3_constants/s3_config.go18
-rw-r--r--weed/s3api/s3api_bucket_handlers.go197
-rw-r--r--weed/s3api/s3api_bucket_skip_handlers.go49
-rw-r--r--weed/s3api/s3api_circuit_breaker.go183
-rw-r--r--weed/s3api/s3api_circuit_breaker_test.go107
-rw-r--r--weed/s3api/s3api_handlers.go16
-rw-r--r--weed/s3api/s3api_object_copy_handlers.go188
-rw-r--r--weed/s3api/s3api_object_copy_handlers_test.go426
-rw-r--r--weed/s3api/s3api_object_handlers.go255
-rw-r--r--weed/s3api/s3api_object_handlers_postpolicy.go52
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go160
-rw-r--r--weed/s3api/s3api_object_skip_handlers.go45
-rw-r--r--weed/s3api/s3api_object_tagging_handlers.go47
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go202
-rw-r--r--weed/s3api/s3api_policy.go148
-rw-r--r--weed/s3api/s3api_server.go212
-rw-r--r--weed/s3api/s3api_server_grpc.go16
-rw-r--r--weed/s3api/s3api_status_handlers.go7
-rw-r--r--weed/s3api/s3api_xsd_generated.go20
-rw-r--r--weed/s3api/s3err/audit_fluent.go183
-rw-r--r--weed/s3api/s3err/error_handler.go26
-rw-r--r--weed/s3api/s3err/s3-error.go1
-rw-r--r--weed/s3api/s3err/s3api_errors.go51
-rw-r--r--weed/s3api/stats.go9
-rw-r--r--weed/s3api/tags.go5
-rw-r--r--weed/s3api/tags_test.go1
39 files changed, 2642 insertions, 520 deletions
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 44c3f7aa7..fb23d9ce9 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -2,17 +2,18 @@ package s3api
import (
"fmt"
+ "net/http"
+ "os"
+ "strings"
+ "sync"
+
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- "io/ioutil"
- "net/http"
- "strings"
)
type Action string
@@ -22,8 +23,11 @@ type Iam interface {
}
type IdentityAccessManagement struct {
- identities []*Identity
- domain string
+ m sync.RWMutex
+
+ identities []*Identity
+ isAuthEnabled bool
+ domain string
}
type Identity struct {
@@ -37,6 +41,31 @@ type Credential struct {
SecretKey string
}
+func (action Action) isAdmin() bool {
+ return strings.HasPrefix(string(action), s3_constants.ACTION_ADMIN)
+}
+
+func (action Action) isOwner(bucket string) bool {
+ return string(action) == s3_constants.ACTION_ADMIN+":"+bucket
+}
+
+func (action Action) overBucket(bucket string) bool {
+ return strings.HasSuffix(string(action), ":"+bucket) || strings.HasSuffix(string(action), ":*")
+}
+
+func (action Action) getPermission() Permission {
+ switch act := strings.Split(string(action), ":")[0]; act {
+ case s3_constants.ACTION_ADMIN:
+ return Permission("FULL_CONTROL")
+ case s3_constants.ACTION_WRITE:
+ return Permission("WRITE")
+ case s3_constants.ACTION_READ:
+ return Permission("READ")
+ default:
+ return Permission("")
+ }
+}
+
func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManagement {
iam := &IdentityAccessManagement{
domain: option.DomainName,
@@ -55,26 +84,26 @@ func NewIdentityAccessManagement(option *S3ApiServerOption) *IdentityAccessManag
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFiler(option *S3ApiServerOption) (err error) {
var content []byte
- err = pb.WithFilerClient(option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err = filer.ReadInsideFiler(client, filer.IamConfigDirecotry, filer.IamIdentityFile)
return err
})
if err != nil {
return fmt.Errorf("read S3 config: %v", err)
}
- return iam.loadS3ApiConfigurationFromBytes(content)
+ return iam.LoadS3ApiConfigurationFromBytes(content)
}
func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromFile(fileName string) error {
- content, readErr := ioutil.ReadFile(fileName)
+ content, readErr := os.ReadFile(fileName)
if readErr != nil {
glog.Warningf("fail to read %s : %v", fileName, readErr)
return fmt.Errorf("fail to read %s : %v", fileName, readErr)
}
- return iam.loadS3ApiConfigurationFromBytes(content)
+ return iam.LoadS3ApiConfigurationFromBytes(content)
}
-func (iam *IdentityAccessManagement) loadS3ApiConfigurationFromBytes(content []byte) error {
+func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []byte) error {
s3ApiConfiguration := &iam_pb.S3ApiConfiguration{}
if err := filer.ParseS3ConfigurationFromBytes(content, s3ApiConfiguration); err != nil {
glog.Warningf("unmarshal error: %v", err)
@@ -105,31 +134,39 @@ func (iam *IdentityAccessManagement) loadS3ApiConfiguration(config *iam_pb.S3Api
}
identities = append(identities, t)
}
-
+ iam.m.Lock()
// atomically switch
iam.identities = identities
+ if !iam.isAuthEnabled { // one-directional, no toggling
+ iam.isAuthEnabled = len(identities) > 0
+ }
+ iam.m.Unlock()
return nil
}
func (iam *IdentityAccessManagement) isEnabled() bool {
-
- return len(iam.identities) > 0
+ return iam.isAuthEnabled
}
func (iam *IdentityAccessManagement) lookupByAccessKey(accessKey string) (identity *Identity, cred *Credential, found bool) {
+ iam.m.RLock()
+ defer iam.m.RUnlock()
for _, ident := range iam.identities {
for _, cred := range ident.Credentials {
+ // println("checking", ident.Name, cred.AccessKey)
if cred.AccessKey == accessKey {
return ident, cred, true
}
}
}
+ glog.V(1).Infof("could not find accessKey %s", accessKey)
return nil, nil, false
}
func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, found bool) {
-
+ iam.m.RLock()
+ defer iam.m.RUnlock()
for _, ident := range iam.identities {
if ident.Name == "anonymous" {
return ident, true
@@ -139,24 +176,26 @@ func (iam *IdentityAccessManagement) lookupAnonymous() (identity *Identity, foun
}
func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) http.HandlerFunc {
-
- if !iam.isEnabled() {
- return f
- }
-
return func(w http.ResponseWriter, r *http.Request) {
+ if !iam.isEnabled() {
+ f(w, r)
+ return
+ }
+
identity, errCode := iam.authRequest(r, action)
if errCode == s3err.ErrNone {
if identity != nil && identity.Name != "" {
- r.Header.Set(xhttp.AmzIdentityId, identity.Name)
+ r.Header.Set(s3_constants.AmzIdentityId, identity.Name)
if identity.isAdmin() {
- r.Header.Set(xhttp.AmzIsAdmin, "true")
+ r.Header.Set(s3_constants.AmzIsAdmin, "true")
+ } else if _, ok := r.Header[s3_constants.AmzIsAdmin]; ok {
+ r.Header.Del(s3_constants.AmzIsAdmin)
}
}
f(w, r)
return
}
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
}
}
@@ -165,42 +204,53 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
var identity *Identity
var s3Err s3err.ErrorCode
var found bool
+ var authType string
switch getRequestAuthType(r) {
case authTypeStreamingSigned:
return identity, s3err.ErrNone
case authTypeUnknown:
glog.V(3).Infof("unknown auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "Unknown")
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r)
+ authType = "SigV2"
case authTypeSigned, authTypePresigned:
glog.V(3).Infof("v4 auth type")
identity, s3Err = iam.reqSignatureV4Verify(r)
+ authType = "SigV4"
case authTypePostPolicy:
glog.V(3).Infof("post policy auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "PostPolicy")
return identity, s3err.ErrNone
case authTypeJWT:
glog.V(3).Infof("jwt auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "Jwt")
return identity, s3err.ErrNotImplemented
case authTypeAnonymous:
+ authType = "Anonymous"
identity, found = iam.lookupAnonymous()
if !found {
+ r.Header.Set(s3_constants.AmzAuthType, authType)
return identity, s3err.ErrAccessDenied
}
default:
return identity, s3err.ErrNotImplemented
}
+ if len(authType) > 0 {
+ r.Header.Set(s3_constants.AmzAuthType, authType)
+ }
if s3Err != s3err.ErrNone {
return identity, s3Err
}
glog.V(3).Infof("user name: %v actions: %v, action: %v", identity.Name, identity.Actions, action)
- bucket, _ := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
- if !identity.canDo(action, bucket) {
+ if !identity.canDo(action, bucket, object) {
return identity, s3err.ErrAccessDenied
}
@@ -212,33 +262,45 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err
var identity *Identity
var s3Err s3err.ErrorCode
var found bool
+ var authType string
switch getRequestAuthType(r) {
case authTypeStreamingSigned:
return identity, s3err.ErrNone
case authTypeUnknown:
glog.V(3).Infof("unknown auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "Unknown")
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r)
+ authType = "SigV2"
case authTypeSigned, authTypePresigned:
glog.V(3).Infof("v4 auth type")
identity, s3Err = iam.reqSignatureV4Verify(r)
+ authType = "SigV4"
case authTypePostPolicy:
glog.V(3).Infof("post policy auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "PostPolicy")
return identity, s3err.ErrNone
case authTypeJWT:
glog.V(3).Infof("jwt auth type")
+ r.Header.Set(s3_constants.AmzAuthType, "Jwt")
return identity, s3err.ErrNotImplemented
case authTypeAnonymous:
+ authType = "Anonymous"
identity, found = iam.lookupAnonymous()
if !found {
+ r.Header.Set(s3_constants.AmzAuthType, authType)
return identity, s3err.ErrAccessDenied
}
default:
return identity, s3err.ErrNotImplemented
}
+ if len(authType) > 0 {
+ r.Header.Set(s3_constants.AmzAuthType, authType)
+ }
+
glog.V(3).Infof("auth error: %v", s3Err)
if s3Err != s3err.ErrNone {
return identity, s3Err
@@ -246,7 +308,7 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err
return identity, s3err.ErrNone
}
-func (identity *Identity) canDo(action Action, bucket string) bool {
+func (identity *Identity) canDo(action Action, bucket string, objectKey string) bool {
if identity.isAdmin() {
return true
}
@@ -258,15 +320,17 @@ func (identity *Identity) canDo(action Action, bucket string) bool {
if bucket == "" {
return false
}
+ target := string(action) + ":" + bucket + objectKey
+ adminTarget := s3_constants.ACTION_ADMIN + ":" + bucket + objectKey
limitedByBucket := string(action) + ":" + bucket
adminLimitedByBucket := s3_constants.ACTION_ADMIN + ":" + bucket
for _, a := range identity.Actions {
act := string(a)
if strings.HasSuffix(act, "*") {
- if strings.HasPrefix(limitedByBucket, act[:len(act)-1]) {
+ if strings.HasPrefix(target, act[:len(act)-1]) {
return true
}
- if strings.HasPrefix(adminLimitedByBucket, act[:len(act)-1]) {
+ if strings.HasPrefix(adminTarget, act[:len(act)-1]) {
return true
}
} else {
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
index 05cce632a..f2bd94f56 100644
--- a/weed/s3api/auth_credentials_subscribe.go
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -5,10 +5,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
+func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
@@ -22,18 +23,41 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
if message.NewParentPath != "" {
dir = message.NewParentPath
}
- if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
- if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil {
- return err
- }
- glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile)
- }
+ fileName := message.NewEntry.Name
+ content := message.NewEntry.Content
+
+ _ = s3a.onIamConfigUpdate(dir, fileName, content)
+ _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
return nil
}
- return util.Retry("followIamChanges", func() error {
- return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true)
+ util.RetryForever("followIamChanges", func() error {
+ return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, prefix, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
+ }, func(err error) bool {
+ glog.V(0).Infof("iam follow metadata changes: %v", err)
+ return true
})
+}
+//reload iam config
+func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
+ if dir == filer.IamConfigDirecotry && filename == filer.IamIdentityFile {
+ if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
+ return err
+ }
+ glog.V(0).Infof("updated %s/%s", dir, filename)
+ }
+ return nil
+}
+
+//reload circuit breaker config
+func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
+ if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
+ if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
+ return err
+ }
+ glog.V(0).Infof("updated %s/%s", dir, filename)
+ }
+ return nil
}
diff --git a/weed/s3api/auth_credentials_test.go b/weed/s3api/auth_credentials_test.go
index 0383ddbcd..4545d13bc 100644
--- a/weed/s3api/auth_credentials_test.go
+++ b/weed/s3api/auth_credentials_test.go
@@ -2,6 +2,7 @@ package s3api
import (
. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/stretchr/testify/assert"
"testing"
"github.com/golang/protobuf/jsonpb"
@@ -67,3 +68,61 @@ func TestIdentityListFileFormat(t *testing.T) {
println(text)
}
+
+func TestCanDo(t *testing.T) {
+ ident1 := &Identity{
+ Name: "anything",
+ Actions: []Action{
+ "Write:bucket1/a/b/c/*",
+ "Write:bucket1/a/b/other",
+ },
+ }
+ // object specific
+ assert.Equal(t, true, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
+ assert.Equal(t, false, ident1.canDo(ACTION_WRITE, "bucket1", "/a/b/other/some"), "action without *")
+
+ // bucket specific
+ ident2 := &Identity{
+ Name: "anything",
+ Actions: []Action{
+ "Read:bucket1",
+ "Write:bucket1/*",
+ },
+ }
+ assert.Equal(t, true, ident2.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
+ assert.Equal(t, true, ident2.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
+ assert.Equal(t, false, ident2.canDo(ACTION_LIST, "bucket1", "/a/b/c/d.txt"))
+
+ // across buckets
+ ident3 := &Identity{
+ Name: "anything",
+ Actions: []Action{
+ "Read",
+ "Write",
+ },
+ }
+ assert.Equal(t, true, ident3.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
+ assert.Equal(t, true, ident3.canDo(ACTION_WRITE, "bucket1", "/a/b/c/d.txt"))
+ assert.Equal(t, false, ident3.canDo(ACTION_LIST, "bucket1", "/a/b/other/some"))
+
+ // partial buckets
+ ident4 := &Identity{
+ Name: "anything",
+ Actions: []Action{
+ "Read:special_*",
+ },
+ }
+ assert.Equal(t, true, ident4.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt"))
+ assert.Equal(t, false, ident4.canDo(ACTION_READ, "bucket1", "/a/b/c/d.txt"))
+
+ // admin buckets
+ ident5 := &Identity{
+ Name: "anything",
+ Actions: []Action{
+ "Admin:special_*",
+ },
+ }
+ assert.Equal(t, true, ident5.canDo(ACTION_READ, "special_bucket", "/a/b/c/d.txt"))
+ assert.Equal(t, true, ident5.canDo(ACTION_WRITE, "special_bucket", "/a/b/c/d.txt"))
+
+}
diff --git a/weed/s3api/auth_signature_v4.go b/weed/s3api/auth_signature_v4.go
index 0df26e6fc..a49caad06 100644
--- a/weed/s3api/auth_signature_v4.go
+++ b/weed/s3api/auth_signature_v4.go
@@ -23,8 +23,7 @@ import (
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- "io/ioutil"
+ "io"
"net/http"
"net/url"
"regexp"
@@ -33,6 +32,8 @@ import (
"strings"
"time"
"unicode/utf8"
+
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
)
func (iam *IdentityAccessManagement) reqSignatureV4Verify(r *http.Request) (*Identity, s3err.ErrorCode) {
@@ -135,9 +136,9 @@ func (iam *IdentityAccessManagement) doesSignatureMatch(hashedPayload string, r
// Get hashed Payload
if signV4Values.Credential.scope.service != "s3" && hashedPayload == emptySHA256 && r.Body != nil {
- buf, _ := ioutil.ReadAll(r.Body)
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
- b, _ := ioutil.ReadAll(bytes.NewBuffer(buf))
+ buf, _ := io.ReadAll(r.Body)
+ r.Body = io.NopCloser(bytes.NewBuffer(buf))
+ b, _ := io.ReadAll(bytes.NewBuffer(buf))
if len(b) != 0 {
bodyHash := sha256.Sum256(b)
hashedPayload = hex.EncodeToString(bodyHash[:])
@@ -433,7 +434,7 @@ func (iam *IdentityAccessManagement) doesPresignedSignatureMatch(hashedPayload s
}
}
- /// Verify finally if signature is same.
+ // / Verify finally if signature is same.
// Get canonical request.
presignedCanonicalReq := getCanonicalRequest(extractedSignedHeaders, hashedPayload, encodedQuery, req.URL.Path, req.Method)
diff --git a/weed/s3api/auto_signature_v4_test.go b/weed/s3api/auto_signature_v4_test.go
index b47cd5f2d..a58551187 100644
--- a/weed/s3api/auto_signature_v4_test.go
+++ b/weed/s3api/auto_signature_v4_test.go
@@ -8,9 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"io"
- "io/ioutil"
"net/http"
"net/url"
"sort"
@@ -19,6 +17,8 @@ import (
"testing"
"time"
"unicode/utf8"
+
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
)
// TestIsRequestPresignedSignatureV4 - Test validates the logic for presign signature verision v4 detection.
@@ -86,7 +86,7 @@ func TestIsReqAuthenticated(t *testing.T) {
// Validates all testcases.
for i, testCase := range testCases {
if _, s3Error := iam.reqSignatureV4Verify(testCase.req); s3Error != testCase.s3Error {
- ioutil.ReadAll(testCase.req.Body)
+ io.ReadAll(testCase.req.Body)
t.Fatalf("Test %d: Unexpected S3 error: want %d - got %d", i, testCase.s3Error, s3Error)
}
}
@@ -167,7 +167,7 @@ func newTestRequest(method, urlStr string, contentLength int64, body io.ReadSeek
case body == nil:
hashedPayload = getSHA256Hash([]byte{})
default:
- payloadBytes, err := ioutil.ReadAll(body)
+ payloadBytes, err := io.ReadAll(body)
if err != nil {
return nil, err
}
diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go
index ec26f693a..2678f312f 100644
--- a/weed/s3api/chunked_reader_v4.go
+++ b/weed/s3api/chunked_reader_v4.go
@@ -24,6 +24,7 @@ import (
"crypto/sha256"
"encoding/hex"
"errors"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"hash"
"io"
@@ -90,8 +91,8 @@ func (iam *IdentityAccessManagement) calculateSeedSignature(r *http.Request) (cr
return nil, "", "", time.Time{}, s3err.ErrInvalidAccessKeyID
}
- bucket, _ := getBucketAndObject(r)
- if !identity.canDo("Write", bucket) {
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ if !identity.canDo(s3_constants.ACTION_WRITE, bucket, object) {
errCode = s3err.ErrAccessDenied
return
}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 2b6707f2e..32b93307a 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -1,17 +1,20 @@
package s3api
import (
+ "encoding/hex"
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "golang.org/x/exp/slices"
+ "math"
"path/filepath"
+ "sort"
"strconv"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
- "github.com/google/uuid"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -27,8 +30,7 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
glog.V(2).Infof("createMultipartUpload input %v", input)
- uploadId, _ := uuid.NewRandom()
- uploadIdString := uploadId.String()
+ uploadIdString := s3a.generateUploadID(*input.Key)
if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
@@ -38,6 +40,9 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
for k, v := range input.Metadata {
entry.Extended[k] = []byte(*v)
}
+ if input.ContentType != nil {
+ entry.Attributes.Mime = *input.ContentType
+ }
}); err != nil {
glog.Errorf("NewMultipartUpload error: %v", err)
return nil, s3err.ErrInternalError
@@ -59,13 +64,18 @@ type CompleteMultipartUploadResult struct {
s3.CompleteMultipartUploadOutput
}
-func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
+func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
+ completedParts := parts.Parts
+ slices.SortFunc(completedParts, func(a, b CompletedPart) bool {
+ return a.PartNumber < b.PartNumber
+ })
+
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
- entries, _, err := s3a.list(uploadDirectory, "", "", false, 0)
+ entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
if err != nil || len(entries) == 0 {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
return nil, s3err.ErrNoSuchUpload
@@ -77,11 +87,22 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrNoSuchUpload
}
+ mime := pentry.Attributes.Mime
+
var finalParts []*filer_pb.FileChunk
var offset int64
for _, entry := range entries {
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
+ partETag, found := findByPartNumber(entry.Name, completedParts)
+ if !found {
+ continue
+ }
+ entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
+ if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
+ glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
+ return nil, s3err.ErrInvalidPart
+ }
for _, chunk := range entry.Chunks {
p := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
@@ -121,6 +142,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
entry.Extended[k] = v
}
}
+ if pentry.Attributes.Mime != "" {
+ entry.Attributes.Mime = pentry.Attributes.Mime
+ } else if mime != "" {
+ entry.Attributes.Mime = mime
+ }
})
if err != nil {
@@ -130,7 +156,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
output = &CompleteMultipartUploadResult{
CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer, dirName, entryName)),
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlPathEscape(dirName), urlPathEscape(entryName))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
@@ -144,6 +170,31 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return
}
+func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) {
+ partNumber, formatErr := strconv.Atoi(fileName[:4])
+ if formatErr != nil {
+ return
+ }
+ x := sort.Search(len(parts), func(i int) bool {
+ return parts[i].PartNumber >= partNumber
+ })
+ if x >= len(parts) {
+ return
+ }
+ if parts[x].PartNumber != partNumber {
+ return
+ }
+ y := 0
+ for i, part := range parts[x:] {
+ if part.PartNumber == partNumber {
+ y = i
+ } else {
+ break
+ }
+ }
+ return parts[x+y].ETag, true
+}
+
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
glog.V(2).Infof("abortMultipartUpload input %v", input)
@@ -195,13 +246,13 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
Prefix: input.Prefix,
}
- entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, uint32(*input.MaxUploads))
+ entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32)
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
}
- output.IsTruncated = aws.Bool(!isLast)
+ uploadsCount := int64(0)
for _, entry := range entries {
if entry.Extended != nil {
key := string(entry.Extended["key"])
@@ -215,9 +266,12 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
Key: objectKey(aws.String(key)),
UploadId: aws.String(entry.Name),
})
- if !isLast {
- output.NextUploadIdMarker = aws.String(entry.Name)
- }
+ uploadsCount += 1
+ }
+ if uploadsCount >= *input.MaxUploads {
+ output.IsTruncated = aws.Bool(true)
+ output.NextUploadIdMarker = aws.String(entry.Name)
+ break
}
}
@@ -259,6 +313,9 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
return nil, s3err.ErrNoSuchUpload
}
+ // Note: The upload directory is sort of a marker of the existence of an multipart upload request.
+ // So can not just delete empty upload folders.
+
output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries {
diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go
index 9e1d2307b..fe2b9c0ce 100644
--- a/weed/s3api/filer_multipart_test.go
+++ b/weed/s3api/filer_multipart_test.go
@@ -4,6 +4,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/stretchr/testify/assert"
"testing"
"time"
)
@@ -48,3 +49,89 @@ func TestListPartsResult(t *testing.T) {
}
}
+
+func Test_findByPartNumber(t *testing.T) {
+ type args struct {
+ fileName string
+ parts []CompletedPart
+ }
+
+ parts := []CompletedPart{
+ CompletedPart{
+ ETag: "xxx",
+ PartNumber: 1,
+ },
+ CompletedPart{
+ ETag: "lll",
+ PartNumber: 1,
+ },
+ CompletedPart{
+ ETag: "yyy",
+ PartNumber: 3,
+ },
+ CompletedPart{
+ ETag: "zzz",
+ PartNumber: 5,
+ },
+ }
+
+ tests := []struct {
+ name string
+ args args
+ wantEtag string
+ wantFound bool
+ }{
+ {
+ "first",
+ args{
+ "0001.part",
+ parts,
+ },
+ "lll",
+ true,
+ },
+ {
+ "second",
+ args{
+ "0002.part",
+ parts,
+ },
+ "",
+ false,
+ },
+ {
+ "third",
+ args{
+ "0003.part",
+ parts,
+ },
+ "yyy",
+ true,
+ },
+ {
+ "fourth",
+ args{
+ "0004.part",
+ parts,
+ },
+ "",
+ false,
+ },
+ {
+ "fifth",
+ args{
+ "0005.part",
+ parts,
+ },
+ "zzz",
+ true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts)
+ assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
+ assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
+ })
+ }
+}
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 888003e45..dbd667339 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -41,7 +41,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData, isRecursive bool) error {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err != nil {
@@ -55,10 +55,11 @@ func (s3a *S3ApiServer) rm(parentDirectoryPath, entryName string, isDeleteData,
func doDeleteEntry(client filer_pb.SeaweedFilerClient, parentDirectoryPath string, entryName string, isDeleteData bool, isRecursive bool) error {
request := &filer_pb.DeleteEntryRequest{
- Directory: parentDirectoryPath,
- Name: entryName,
- IsDeleteData: isDeleteData,
- IsRecursive: isRecursive,
+ Directory: parentDirectoryPath,
+ Name: entryName,
+ IsDeleteData: isDeleteData,
+ IsRecursive: isRecursive,
+ IgnoreRecursiveError: true,
}
glog.V(1).Infof("delete entry %v/%v: %v", parentDirectoryPath, entryName, request)
diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go
index 75d3b37d0..18d4d69c5 100644
--- a/weed/s3api/filer_util_tags.go
+++ b/weed/s3api/filer_util_tags.go
@@ -1,19 +1,19 @@
package s3api
import (
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"strings"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
)
const (
- S3TAG_PREFIX = xhttp.AmzObjectTagging + "-"
+ S3TAG_PREFIX = s3_constants.AmzObjectTagging + "-"
)
func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
- err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -35,7 +35,7 @@ func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (t
func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, tags map[string]string) (err error) {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -71,7 +71,7 @@ func (s3a *S3ApiServer) setTags(parentDirectoryPath string, entryName string, ta
func (s3a *S3ApiServer) rmTags(parentDirectoryPath string, entryName string) (err error) {
- return s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := filer_pb.LookupEntry(client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go
deleted file mode 100644
index 6614b0af0..000000000
--- a/weed/s3api/http/header.go
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package http
-
-// Standard S3 HTTP request constants
-const (
- // S3 storage class
- AmzStorageClass = "x-amz-storage-class"
-
- // S3 user-defined metadata
- AmzUserMetaPrefix = "X-Amz-Meta-"
-
- // S3 object tagging
- AmzObjectTagging = "X-Amz-Tagging"
- AmzTagCount = "x-amz-tagging-count"
-)
-
-// Non-Standard S3 HTTP request constants
-const (
- AmzIdentityId = "s3-identity-id"
- AmzIsAdmin = "s3-is-admin" // only set to http request header as a context
-)
diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go
new file mode 100644
index 000000000..cd725d435
--- /dev/null
+++ b/weed/s3api/s3_constants/header.go
@@ -0,0 +1,66 @@
+/*
+ * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package s3_constants
+
+import (
+ "github.com/gorilla/mux"
+ "net/http"
+ "strings"
+)
+
+// Standard S3 HTTP request constants
+const (
+ // S3 storage class
+ AmzStorageClass = "x-amz-storage-class"
+
+ // S3 user-defined metadata
+ AmzUserMetaPrefix = "X-Amz-Meta-"
+ AmzUserMetaDirective = "X-Amz-Metadata-Directive"
+
+ // S3 object tagging
+ AmzObjectTagging = "X-Amz-Tagging"
+ AmzObjectTaggingPrefix = "X-Amz-Tagging-"
+ AmzObjectTaggingDirective = "X-Amz-Tagging-Directive"
+ AmzTagCount = "x-amz-tagging-count"
+)
+
+// Non-Standard S3 HTTP request constants
+const (
+ AmzIdentityId = "s3-identity-id"
+ AmzAuthType = "s3-auth-type"
+ AmzIsAdmin = "s3-is-admin" // only set to http request header as a context
+)
+
+func GetBucketAndObject(r *http.Request) (bucket, object string) {
+ vars := mux.Vars(r)
+ bucket = vars["bucket"]
+ object = vars["object"]
+ if !strings.HasPrefix(object, "/") {
+ object = "/" + object
+ }
+
+ return
+}
+
+var PassThroughHeaders = map[string]string{
+ "response-cache-control": "Cache-Control",
+ "response-content-disposition": "Content-Disposition",
+ "response-content-encoding": "Content-Encoding",
+ "response-content-language": "Content-Language",
+ "response-content-type": "Content-Type",
+ "response-expires": "Expires",
+}
diff --git a/weed/s3api/s3_constants/s3_actions.go b/weed/s3api/s3_constants/s3_actions.go
index 4e484ac98..0fbf134e3 100644
--- a/weed/s3api/s3_constants/s3_actions.go
+++ b/weed/s3api/s3_constants/s3_actions.go
@@ -6,4 +6,6 @@ const (
ACTION_ADMIN = "Admin"
ACTION_TAGGING = "Tagging"
ACTION_LIST = "List"
+
+ SeaweedStorageDestinationHeader = "x-seaweedfs-destination"
)
diff --git a/weed/s3api/s3_constants/s3_config.go b/weed/s3api/s3_constants/s3_config.go
new file mode 100644
index 000000000..0fa5b26f4
--- /dev/null
+++ b/weed/s3api/s3_constants/s3_config.go
@@ -0,0 +1,18 @@
+package s3_constants
+
+import (
+ "strings"
+)
+
+var (
+ CircuitBreakerConfigDir = "/etc/s3"
+ CircuitBreakerConfigFile = "circuit_breaker.json"
+ AllowedActions = []string{ACTION_READ, ACTION_WRITE, ACTION_LIST, ACTION_TAGGING, ACTION_ADMIN}
+ LimitTypeCount = "Count"
+ LimitTypeBytes = "MB"
+ Separator = ":"
+)
+
+func Concat(elements ...string) string {
+ return strings.Join(elements, Separator)
+}
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 8beb954aa..f70e46b92 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -3,13 +3,16 @@ package s3api
import (
"context"
"encoding/xml"
+ "errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"math"
"net/http"
"time"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/aws/aws-sdk-go/aws"
@@ -27,12 +30,14 @@ type ListAllMyBucketsResult struct {
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
+ glog.V(3).Infof("ListBucketsHandler")
+
var identity *Identity
var s3Err s3err.ErrorCode
if s3a.iam.isEnabled() {
identity, s3Err = s3a.iam.authUser(r)
if s3Err != s3err.ErrNone {
- s3err.WriteErrorResponse(w, s3Err, r)
+ s3err.WriteErrorResponse(w, r, s3Err)
return
}
}
@@ -42,16 +47,16 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
entries, _, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- identityId := r.Header.Get(xhttp.AmzIdentityId)
+ identityId := r.Header.Get(s3_constants.AmzIdentityId)
var buckets []*s3.Bucket
for _, entry := range entries {
if entry.IsDirectory {
- if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name) {
+ if identity != nil && !identity.canDo(s3_constants.ACTION_LIST, entry.Name, "") {
continue
}
buckets = append(buckets, &s3.Bucket{
@@ -69,16 +74,17 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
Buckets: buckets,
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("PutBucketHandler %s", bucket)
// avoid duplicated buckets
errCode := s3err.ErrNone
- if err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if resp, err := client.CollectionList(context.Background(), &filer_pb.CollectionListRequest{
IncludeEcVolumes: true,
IncludeNormalVolumes: true,
@@ -95,46 +101,63 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
}
return nil
}); err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
errCode = s3err.ErrBucketAlreadyExists
}
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
+ if s3a.iam.isEnabled() {
+ if _, errCode = s3a.iam.authRequest(r, s3_constants.ACTION_ADMIN); errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
+ }
+
fn := func(entry *filer_pb.Entry) {
- if identityId := r.Header.Get(xhttp.AmzIdentityId); identityId != "" {
+ if identityId := r.Header.Get(s3_constants.AmzIdentityId); identityId != "" {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
- entry.Extended[xhttp.AmzIdentityId] = []byte(identityId)
+ entry.Extended[s3_constants.AmzIdentityId] = []byte(identityId)
}
}
// create the folder for bucket, but lazily create actual collection
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, fn); err != nil {
glog.Errorf("PutBucketHandler mkdir: %v", err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
-
- writeSuccessResponseEmpty(w)
+ w.Header().Set("Location", "/"+bucket)
+ writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("DeleteBucketHandler %s", bucket)
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
- s3err.WriteErrorResponse(w, err, r)
+ s3err.WriteErrorResponse(w, r, err)
return
}
- err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ if !s3a.option.AllowDeleteBucketNotEmpty {
+ entries, _, err := s3a.list(s3a.option.BucketsPath+"/"+bucket, "", "", false, 1)
+ if err != nil {
+ return fmt.Errorf("failed to list bucket %s: %v", bucket, err)
+ }
+ if len(entries) > 0 {
+ return errors.New(s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code)
+ }
+ }
// delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
@@ -149,26 +172,36 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
return nil
})
+ if err != nil {
+ s3ErrorCode := s3err.ErrInternalError
+ if err.Error() == s3err.GetAPIError(s3err.ErrBucketNotEmpty).Code {
+ s3ErrorCode = s3err.ErrBucketNotEmpty
+ }
+ s3err.WriteErrorResponse(w, r, s3ErrorCode)
+ return
+ }
+
err = s3a.rm(s3a.option.BucketsPath, bucket, false, true)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- s3err.WriteEmptyResponse(w, http.StatusNoContent)
+ s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("HeadBucketHandler %s", bucket)
- if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
- s3err.WriteErrorResponse(w, err, r)
+ if entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket); entry == nil || err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
- writeSuccessResponseEmpty(w)
+ writeSuccessResponseEmpty(w, r)
}
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
@@ -184,7 +217,7 @@ func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorC
}
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
- isAdmin := r.Header.Get(xhttp.AmzIsAdmin) != ""
+ isAdmin := r.Header.Get(s3_constants.AmzIsAdmin) != ""
if isAdmin {
return true
}
@@ -192,11 +225,119 @@ func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
return true
}
- identityId := r.Header.Get(xhttp.AmzIdentityId)
- if id, ok := entry.Extended[xhttp.AmzIdentityId]; ok {
+ identityId := r.Header.Get(s3_constants.AmzIdentityId)
+ if id, ok := entry.Extended[s3_constants.AmzIdentityId]; ok {
if identityId != string(id) {
return false
}
}
return true
}
+
+// GetBucketAclHandler Get Bucket ACL
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketAcl.html
+func (s3a *S3ApiServer) GetBucketAclHandler(w http.ResponseWriter, r *http.Request) {
+ // collect parameters
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("GetBucketAclHandler %s", bucket)
+
+ if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, err)
+ return
+ }
+
+ response := AccessControlPolicy{}
+ for _, ident := range s3a.iam.identities {
+ if len(ident.Credentials) == 0 {
+ continue
+ }
+ for _, action := range ident.Actions {
+ if !action.overBucket(bucket) || action.getPermission() == "" {
+ continue
+ }
+ id := ident.Credentials[0].AccessKey
+ if response.Owner.DisplayName == "" && action.isOwner(bucket) && len(ident.Credentials) > 0 {
+ response.Owner.DisplayName = ident.Name
+ response.Owner.ID = id
+ }
+ response.AccessControlList.Grant = append(response.AccessControlList.Grant, Grant{
+ Grantee: Grantee{
+ ID: id,
+ DisplayName: ident.Name,
+ Type: "CanonicalUser",
+ XMLXSI: "CanonicalUser",
+ XMLNS: "http://www.w3.org/2001/XMLSchema-instance"},
+ Permission: action.getPermission(),
+ })
+ }
+ }
+ writeSuccessResponseXML(w, r, response)
+}
+
+// GetBucketLifecycleConfigurationHandler Get Bucket Lifecycle configuration
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html
+func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
+ // collect parameters
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("GetBucketLifecycleConfigurationHandler %s", bucket)
+
+ if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, err)
+ return
+ }
+ fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
+ if err != nil {
+ glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ ttls := fc.GetCollectionTtls(bucket)
+ if len(ttls) == 0 {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchLifecycleConfiguration)
+ return
+ }
+ response := Lifecycle{}
+ for prefix, internalTtl := range ttls {
+ ttl, _ := needle.ReadTTL(internalTtl)
+ days := int(ttl.Minutes() / 60 / 24)
+ if days == 0 {
+ continue
+ }
+ response.Rules = append(response.Rules, Rule{
+ Status: Enabled, Filter: Filter{
+ Prefix: Prefix{string: prefix, set: true},
+ set: true,
+ },
+ Expiration: Expiration{Days: days, set: true},
+ })
+ }
+ writeSuccessResponseXML(w, r, response)
+}
+
+// PutBucketLifecycleConfigurationHandler Put Bucket Lifecycle configuration
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html
+func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWriter, r *http.Request) {
+
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
+
+}
+
+// DeleteBucketMetricsConfiguration Delete Bucket Lifecycle
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html
+func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
+
+ s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
+
+}
+
+// GetBucketLocationHandler Get bucket location
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLocation.html
+func (s3a *S3ApiServer) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) {
+ writeSuccessResponseXML(w, r, LocationConstraint{})
+}
+
+// GetBucketRequestPaymentHandler Get bucket location
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketRequestPayment.html
+func (s3a *S3ApiServer) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) {
+ writeSuccessResponseXML(w, r, RequestPaymentConfiguration{Payer: "BucketOwner"})
+}
diff --git a/weed/s3api/s3api_bucket_skip_handlers.go b/weed/s3api/s3api_bucket_skip_handlers.go
new file mode 100644
index 000000000..f4ca1177d
--- /dev/null
+++ b/weed/s3api/s3api_bucket_skip_handlers.go
@@ -0,0 +1,49 @@
+package s3api
+
+import (
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+)
+
+// GetBucketCorsHandler Get bucket CORS
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketCors.html
+func (s3a *S3ApiServer) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchCORSConfiguration)
+}
+
+// PutBucketCorsHandler Put bucket CORS
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketCors.html
+func (s3a *S3ApiServer) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
+}
+
+// DeleteBucketCorsHandler Delete bucket CORS
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketCors.html
+func (s3a *S3ApiServer) DeleteBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, http.StatusNoContent)
+}
+
+// GetBucketPolicyHandler Get bucket Policy
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketPolicy.html
+func (s3a *S3ApiServer) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucketPolicy)
+}
+
+// PutBucketPolicyHandler Put bucket Policy
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketPolicy.html
+func (s3a *S3ApiServer) PutBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
+}
+
+// DeleteBucketPolicyHandler Delete bucket Policy
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketPolicy.html
+func (s3a *S3ApiServer) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, http.StatusNoContent)
+}
+
+// PutBucketAclHandler Put bucket ACL
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketAcl.html
+func (s3a *S3ApiServer) PutBucketAclHandler(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
+}
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
new file mode 100644
index 000000000..111b404c7
--- /dev/null
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -0,0 +1,183 @@
+package s3api
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/gorilla/mux"
+ "net/http"
+ "sync"
+ "sync/atomic"
+)
+
+type CircuitBreaker struct {
+ sync.RWMutex
+ Enabled bool
+ counters map[string]*int64
+ limitations map[string]int64
+}
+
+func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
+ cb := &CircuitBreaker{
+ counters: make(map[string]*int64),
+ limitations: make(map[string]int64),
+ }
+
+ err := pb.WithFilerClient(false, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile)
+ if err != nil {
+ return fmt.Errorf("read S3 circuit breaker config: %v", err)
+ }
+ return cb.LoadS3ApiConfigurationFromBytes(content)
+ })
+
+ if err != nil {
+ glog.Warningf("fail to load config: %v", err)
+ }
+
+ return cb
+}
+
+func (cb *CircuitBreaker) LoadS3ApiConfigurationFromBytes(content []byte) error {
+ cbCfg := &s3_pb.S3CircuitBreakerConfig{}
+ if err := filer.ParseS3ConfigurationFromBytes(content, cbCfg); err != nil {
+ glog.Warningf("unmarshal error: %v", err)
+ return fmt.Errorf("unmarshal error: %v", err)
+ }
+ if err := cb.loadCircuitBreakerConfig(cbCfg); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (cb *CircuitBreaker) loadCircuitBreakerConfig(cfg *s3_pb.S3CircuitBreakerConfig) error {
+
+ //global
+ globalEnabled := false
+ globalOptions := cfg.Global
+ limitations := make(map[string]int64)
+ if globalOptions != nil && globalOptions.Enabled && len(globalOptions.Actions) > 0 {
+ globalEnabled = globalOptions.Enabled
+ for action, limit := range globalOptions.Actions {
+ limitations[action] = limit
+ }
+ }
+ cb.Enabled = globalEnabled
+
+ //buckets
+ for bucket, cbOptions := range cfg.Buckets {
+ if cbOptions.Enabled {
+ for action, limit := range cbOptions.Actions {
+ limitations[s3_constants.Concat(bucket, action)] = limit
+ }
+ }
+ }
+
+ cb.limitations = limitations
+ return nil
+}
+
+func (cb *CircuitBreaker) Limit(f func(w http.ResponseWriter, r *http.Request), action string) (http.HandlerFunc, Action) {
+ return func(w http.ResponseWriter, r *http.Request) {
+ if !cb.Enabled {
+ f(w, r)
+ return
+ }
+
+ vars := mux.Vars(r)
+ bucket := vars["bucket"]
+
+ rollback, errCode := cb.limit(r, bucket, action)
+ defer func() {
+ for _, rf := range rollback {
+ rf()
+ }
+ }()
+
+ if errCode == s3err.ErrNone {
+ f(w, r)
+ return
+ }
+ s3err.WriteErrorResponse(w, r, errCode)
+ }, Action(action)
+}
+
+func (cb *CircuitBreaker) limit(r *http.Request, bucket string, action string) (rollback []func(), errCode s3err.ErrorCode) {
+
+ //bucket simultaneous request count
+ bucketCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
+ if bucketCountRollBack != nil {
+ rollback = append(rollback, bucketCountRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //bucket simultaneous request content bytes
+ bucketContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(bucket, action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
+ if bucketContentLengthRollBack != nil {
+ rollback = append(rollback, bucketContentLengthRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //global simultaneous request count
+ globalCountRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeCount), 1, s3err.ErrTooManyRequest)
+ if globalCountRollBack != nil {
+ rollback = append(rollback, globalCountRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+
+ //global simultaneous request content bytes
+ globalContentLengthRollBack, errCode := cb.loadCounterAndCompare(s3_constants.Concat(action, s3_constants.LimitTypeBytes), r.ContentLength, s3err.ErrRequestBytesExceed)
+ if globalContentLengthRollBack != nil {
+ rollback = append(rollback, globalContentLengthRollBack)
+ }
+ if errCode != s3err.ErrNone {
+ return
+ }
+ return
+}
+
+func (cb *CircuitBreaker) loadCounterAndCompare(key string, inc int64, errCode s3err.ErrorCode) (f func(), e s3err.ErrorCode) {
+ e = s3err.ErrNone
+ if max, ok := cb.limitations[key]; ok {
+ cb.RLock()
+ counter, exists := cb.counters[key]
+ cb.RUnlock()
+
+ if !exists {
+ cb.Lock()
+ counter, exists = cb.counters[key]
+ if !exists {
+ var newCounter int64
+ counter = &newCounter
+ cb.counters[key] = counter
+ }
+ cb.Unlock()
+ }
+ current := atomic.LoadInt64(counter)
+ if current+inc > max {
+ e = errCode
+ return
+ } else {
+ current := atomic.AddInt64(counter, inc)
+ f = func() {
+ atomic.AddInt64(counter, -inc)
+ }
+ if current > max {
+ e = errCode
+ return
+ }
+ }
+ }
+ return
+}
diff --git a/weed/s3api/s3api_circuit_breaker_test.go b/weed/s3api/s3api_circuit_breaker_test.go
new file mode 100644
index 000000000..5848cf164
--- /dev/null
+++ b/weed/s3api/s3api_circuit_breaker_test.go
@@ -0,0 +1,107 @@
+package s3api
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "testing"
+)
+
+type TestLimitCase struct {
+ actionName string
+
+ limitType string
+ bucketLimitValue int64
+ globalLimitValue int64
+
+ routineCount int
+ successCount int64
+}
+
+var (
+ bucket = "/test"
+ action = s3_constants.ACTION_WRITE
+ fileSize int64 = 200
+
+ TestLimitCases = []*TestLimitCase{
+
+ //bucket-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 5, 6, 60, 5},
+ {action, s3_constants.LimitTypeCount, 0, 6, 6, 0},
+
+ //global-LimitTypeCount
+ {action, s3_constants.LimitTypeCount, 6, 5, 6, 5},
+ {action, s3_constants.LimitTypeCount, 6, 0, 6, 0},
+
+ //bucket-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1000, 1020, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 0, 1020, 6, 0},
+
+ //global-LimitTypeBytes
+ {action, s3_constants.LimitTypeBytes, 1020, 1000, 6, 5},
+ {action, s3_constants.LimitTypeBytes, 1020, 0, 6, 0},
+ }
+)
+
+func TestLimit(t *testing.T) {
+ for _, tc := range TestLimitCases {
+ circuitBreakerConfig := &s3_pb.S3CircuitBreakerConfig{
+ Global: &s3_pb.S3CircuitBreakerOptions{
+ Enabled: true,
+ Actions: map[string]int64{
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.globalLimitValue,
+ },
+ },
+ Buckets: map[string]*s3_pb.S3CircuitBreakerOptions{
+ bucket: {
+ Enabled: true,
+ Actions: map[string]int64{
+ s3_constants.Concat(tc.actionName, tc.limitType): tc.bucketLimitValue,
+ },
+ },
+ },
+ }
+ circuitBreaker := &CircuitBreaker{
+ counters: make(map[string]*int64),
+ limitations: make(map[string]int64),
+ }
+ err := circuitBreaker.loadCircuitBreakerConfig(circuitBreakerConfig)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ successCount := doLimit(circuitBreaker, tc.routineCount, &http.Request{ContentLength: fileSize}, tc.actionName)
+ if successCount != tc.successCount {
+ t.Errorf("successCount not equal, expect=%d, actual=%d, case: %v", tc.successCount, successCount, tc)
+ }
+ }
+}
+
+func doLimit(circuitBreaker *CircuitBreaker, routineCount int, r *http.Request, action string) int64 {
+ var successCounter int64
+ resultCh := make(chan []func(), routineCount)
+ var wg sync.WaitGroup
+ for i := 0; i < routineCount; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ rollbackFn, errCode := circuitBreaker.limit(r, bucket, action)
+ if errCode == s3err.ErrNone {
+ atomic.AddInt64(&successCounter, 1)
+ }
+ resultCh <- rollbackFn
+ }()
+ }
+ wg.Wait()
+ close(resultCh)
+ for fns := range resultCh {
+ for _, fn := range fns {
+ fn()
+ }
+ }
+ return successCounter
+}
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index b4d2c22e7..4ace4bb21 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -13,24 +13,26 @@ import (
var _ = filer_pb.FilerClient(&S3ApiServer{})
-func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
+ }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption)
}
+
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
-func writeSuccessResponseXML(w http.ResponseWriter, response interface{}) {
- s3err.WriteXMLResponse(w, http.StatusOK, response)
+func writeSuccessResponseXML(w http.ResponseWriter, r *http.Request, response interface{}) {
+ s3err.WriteXMLResponse(w, r, http.StatusOK, response)
+ s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}
-func writeSuccessResponseEmpty(w http.ResponseWriter) {
- s3err.WriteEmptyResponse(w, http.StatusOK)
+func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) {
+ s3err.WriteEmptyResponse(w, r, http.StatusOK)
}
func validateContentMd5(h http.Header) ([]byte, error) {
diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go
index 60dd54152..9157748f6 100644
--- a/weed/s3api/s3api_object_copy_handlers.go
+++ b/weed/s3api/s3api_object_copy_handlers.go
@@ -3,8 +3,9 @@ package s3api
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "modernc.org/strutil"
"net/http"
"net/url"
"strconv"
@@ -14,9 +15,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+const (
+ DirectiveCopy = "COPY"
+ DirectiveReplace = "REPLACE"
+)
+
func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
- dstBucket, dstObject := getBucketAndObject(r)
+ dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
@@ -27,19 +33,25 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
- if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && isReplace(r) {
+ glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
+
+ replaceMeta, replaceTagging := replaceDirective(r.Header)
+
+ if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
dir, name := fullPath.DirAndName()
entry, err := s3a.getEntry(dir, name)
- if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ if err != nil || entry.IsDirectory {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
}
- entry.Extended = weed_server.SaveAmzMetaData(r, entry.Extended, isReplace(r))
+ entry.Extended = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
err = s3a.touch(dir, name, entry)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
}
- writeSuccessResponseXML(w, CopyObjectResult{
+ writeSuccessResponseXML(w, r, CopyObjectResult{
ETag: fmt.Sprintf("%x", entry.Attributes.Md5),
LastModified: time.Now().UTC(),
})
@@ -48,32 +60,44 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
+ }
+ srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
+ dir, name := srcPath.DirAndName()
+ if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
if srcBucket == dstBucket && srcObject == dstObject {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopyDest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
return
}
dstUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
- s3a.option.Filer, s3a.option.BucketsPath, dstBucket, dstObject, dstBucket)
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject), dstBucket)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject)
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
- _, _, resp, err := util.DownloadFile(srcUrl, "")
+ _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
defer util.CloseResponse(resp)
+ tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
+ if tagErr != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
+ return
+ }
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
- etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body)
+ destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
+ etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
@@ -84,7 +108,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
LastModified: time.Now().UTC(),
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
@@ -105,7 +129,7 @@ type CopyPartResult struct {
func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
// https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
- dstBucket, _ := getBucketAndObject(r)
+ dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
// Copy source path.
cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
@@ -117,7 +141,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
// If source object is empty or bucket is empty, reply back invalid copy source.
if srcObject == "" || srcBucket == "" {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
@@ -126,35 +150,38 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
partID, err := strconv.Atoi(partIDString)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidPart, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
+ glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d", srcBucket, srcObject, dstBucket, partID)
+
// check partID with maximum part ID for multipart objects
if partID > globalMaxPartID {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
return
}
rangeHeader := r.Header.Get("x-amz-copy-source-range")
dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer, s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket)
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID, dstBucket)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, srcBucket, srcObject)
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
- dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, rangeHeader)
+ dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidCopySource, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
}
defer dataReader.Close()
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
- etag, errCode := s3a.putToFiler(r, dstUrl, dataReader)
+ destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
+ etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
@@ -165,10 +192,111 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
LastModified: time.Now().UTC(),
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
-func isReplace(r *http.Request) bool {
- return r.Header.Get("X-Amz-Metadata-Directive") == "REPLACE"
+func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) {
+ return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace
+}
+
+func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
+ if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 {
+ if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
+ reqHeader[s3_constants.AmzStorageClass] = sc
+ }
+ }
+
+ if !replaceMeta {
+ for header, _ := range reqHeader {
+ if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
+ delete(reqHeader, header)
+ }
+ }
+ for k, v := range existing {
+ if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
+ reqHeader[k] = v
+ }
+ }
+ }
+
+ if !replaceTagging {
+ for header, _ := range reqHeader {
+ if strings.HasPrefix(header, s3_constants.AmzObjectTagging) {
+ delete(reqHeader, header)
+ }
+ }
+
+ found := false
+ for k, _ := range existing {
+ if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) {
+ found = true
+ break
+ }
+ }
+
+ if found {
+ tags, err := getTags(dir, name)
+ if err != nil {
+ return err
+ }
+
+ var tagArr []string
+ for k, v := range tags {
+ tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
+ }
+ tagStr := strutil.JoinFields(tagArr, "&")
+ reqHeader.Set(s3_constants.AmzObjectTagging, tagStr)
+ }
+ }
+ return
+}
+
+func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte) {
+ metadata = make(map[string][]byte)
+
+ if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
+ metadata[s3_constants.AmzStorageClass] = sc
+ }
+ if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
+ metadata[s3_constants.AmzStorageClass] = []byte(sc)
+ }
+
+ if replaceMeta {
+ for header, values := range reqHeader {
+ if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
+ for _, value := range values {
+ metadata[header] = []byte(value)
+ }
+ }
+ }
+ } else {
+ for k, v := range existing {
+ if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
+ metadata[k] = v
+ }
+ }
+ }
+
+ if replaceTagging {
+ if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
+ for _, v := range strings.Split(tags, "&") {
+ tag := strings.Split(v, "=")
+ if len(tag) == 2 {
+ metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ } else if len(tag) == 1 {
+ metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = nil
+ }
+ }
+ }
+ } else {
+ for k, v := range existing {
+ if strings.HasPrefix(k, s3_constants.AmzObjectTagging) {
+ metadata[k] = v
+ }
+ }
+ delete(metadata, s3_constants.AmzTagCount)
+ }
+
+ return
}
diff --git a/weed/s3api/s3api_object_copy_handlers_test.go b/weed/s3api/s3api_object_copy_handlers_test.go
new file mode 100644
index 000000000..610b29a6b
--- /dev/null
+++ b/weed/s3api/s3api_object_copy_handlers_test.go
@@ -0,0 +1,426 @@
+package s3api
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "net/http"
+ "reflect"
+ "sort"
+ "strings"
+ "testing"
+)
+
+type H map[string]string
+
+func (h H) String() string {
+ pairs := make([]string, 0, len(h))
+ for k, v := range h {
+ pairs = append(pairs, fmt.Sprintf("%s : %s", k, v))
+ }
+ sort.Strings(pairs)
+ join := strings.Join(pairs, "\n")
+ return "\n" + join + "\n"
+}
+
+var processMetadataTestCases = []struct {
+ caseId int
+ request H
+ existing H
+ getTags H
+ want H
+}{
+ {
+ 201,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging": "A=B&a=b&type=existing",
+ },
+ },
+ {
+ 202,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=existing",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ },
+ },
+
+ {
+ 203,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ },
+
+ {
+ 204,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ },
+
+ {
+ 205,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{},
+ H{},
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ },
+
+ {
+ 206,
+ H{
+ "User-Agent": "firefox",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ },
+
+ {
+ 207,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-Type": "existing",
+ },
+ H{
+ "A": "B",
+ "a": "b",
+ "type": "existing",
+ },
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ },
+}
+var processMetadataBytesTestCases = []struct {
+ caseId int
+ request H
+ existing H
+ want H
+}{
+ {
+ 101,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ },
+
+ {
+ 102,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ },
+
+ {
+ 103,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "request",
+ },
+ },
+
+ {
+ 104,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "request",
+ },
+ },
+
+ {
+ 105,
+ H{
+ "User-Agent": "firefox",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{
+ "X-Amz-Meta-My-Meta": "existing",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "existing",
+ },
+ H{},
+ },
+
+ {
+ 107,
+ H{
+ "User-Agent": "firefox",
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging": "A=B&a=b&type=request",
+ s3_constants.AmzUserMetaDirective: DirectiveReplace,
+ s3_constants.AmzObjectTaggingDirective: DirectiveReplace,
+ },
+ H{},
+ H{
+ "X-Amz-Meta-My-Meta": "request",
+ "X-Amz-Tagging-A": "B",
+ "X-Amz-Tagging-a": "b",
+ "X-Amz-Tagging-type": "request",
+ },
+ },
+}
+
+func TestProcessMetadata(t *testing.T) {
+ for _, tc := range processMetadataTestCases {
+ reqHeader := transferHToHeader(tc.request)
+ existing := transferHToHeader(tc.existing)
+ replaceMeta, replaceTagging := replaceDirective(reqHeader)
+
+ err := processMetadata(reqHeader, existing, replaceMeta, replaceTagging, func(_ string, _ string) (tags map[string]string, err error) {
+ return tc.getTags, nil
+ }, "", "")
+ if err != nil {
+ t.Error(err)
+ }
+
+ result := transferHeaderToH(reqHeader)
+ fmtTagging(result, tc.want)
+
+ if !reflect.DeepEqual(result, tc.want) {
+ t.Error(fmt.Errorf("\n### CaseID: %d ###"+
+ "\nRequest:%v"+
+ "\nExisting:%v"+
+ "\nGetTags:%v"+
+ "\nWant:%v"+
+ "\nActual:%v",
+ tc.caseId, tc.request, tc.existing, tc.getTags, tc.want, result))
+ }
+ }
+}
+
+func TestProcessMetadataBytes(t *testing.T) {
+ for _, tc := range processMetadataBytesTestCases {
+ reqHeader := transferHToHeader(tc.request)
+ existing := transferHToBytesArr(tc.existing)
+ replaceMeta, replaceTagging := replaceDirective(reqHeader)
+ extends := processMetadataBytes(reqHeader, existing, replaceMeta, replaceTagging)
+
+ result := transferBytesArrToH(extends)
+ fmtTagging(result, tc.want)
+
+ if !reflect.DeepEqual(result, tc.want) {
+ t.Error(fmt.Errorf("\n### CaseID: %d ###"+
+ "\nRequest:%v"+
+ "\nExisting:%v"+
+ "\nWant:%v"+
+ "\nActual:%v",
+ tc.caseId, tc.request, tc.existing, tc.want, result))
+ }
+ }
+}
+
+func fmtTagging(maps ...map[string]string) {
+ for _, m := range maps {
+ if tagging := m[s3_constants.AmzObjectTagging]; len(tagging) > 0 {
+ split := strings.Split(tagging, "&")
+ sort.Strings(split)
+ m[s3_constants.AmzObjectTagging] = strings.Join(split, "&")
+ }
+ }
+}
+
+func transferHToHeader(data map[string]string) http.Header {
+ header := http.Header{}
+ for k, v := range data {
+ header.Add(k, v)
+ }
+ return header
+}
+
+func transferHToBytesArr(data map[string]string) map[string][]byte {
+ m := make(map[string][]byte, len(data))
+ for k, v := range data {
+ m[k] = []byte(v)
+ }
+ return m
+}
+
+func transferBytesArrToH(data map[string][]byte) H {
+ m := make(map[string]string, len(data))
+ for k, v := range data {
+ m[k] = string(v)
+ }
+ return m
+}
+
+func transferHeaderToH(data map[string][]string) H {
+ m := make(map[string]string, len(data))
+ for k, v := range data {
+ m[k] = v[len(v)-1]
+ }
+ return m
+}
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 845c9a577..4ad3454ba 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -1,23 +1,25 @@
package s3api
import (
+ "bytes"
"crypto/md5"
"encoding/json"
"encoding/xml"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/pquerna/cachecontrol/cacheobject"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "golang.org/x/exp/slices"
"io"
- "io/ioutil"
"net/http"
"net/url"
- "sort"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/pquerna/cachecontrol/cacheobject"
- "github.com/gorilla/mux"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -25,39 +27,43 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- client *http.Client
+const (
+ deleteMultipleObjectsLimmit = 1000
)
-func init() {
- client = &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
+func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
+ mimeBuffer := make([]byte, 512)
+ size, _ := dataReader.Read(mimeBuffer)
+ if size > 0 {
+ r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size]))
+ return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader))
+ }
+ return io.NopCloser(dataReader)
}
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
_, err := validateContentMd5(r.Header)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
return
}
if r.Header.Get("Cache-Control") != "" {
if _, err = cacheobject.ParseRequestCacheControl(r.Header.Get("Cache-Control")); err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidDigest)
return
}
}
if r.Header.Get("Expires") != "" {
if _, err = time.Parse(http.TimeFormat, r.Header.Get("Expires")); err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidDigest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedExpires)
return
}
}
@@ -75,36 +81,45 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
_, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
}
if s3ErrCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, s3ErrCode, r)
+ s3err.WriteErrorResponse(w, r, s3ErrCode)
return
}
} else {
if authTypeStreamingSigned == rAuthType {
- s3err.WriteErrorResponse(w, s3err.ErrAuthNotSetup, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrAuthNotSetup)
return
}
}
defer dataReader.Close()
+ objectContentType := r.Header.Get("Content-Type")
if strings.HasSuffix(object, "/") {
- if err := s3a.mkdir(s3a.option.BucketsPath, bucket+object, nil); err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ if err := s3a.mkdir(s3a.option.BucketsPath, bucket+strings.TrimSuffix(object, "/"), func(entry *filer_pb.Entry) {
+ if objectContentType == "" {
+ objectContentType = "httpd/unix-directory"
+ }
+ entry.Attributes.Mime = objectContentType
+ }); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
} else {
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
+ uploadUrl := s3a.toFilerUrl(bucket, object)
+ if objectContentType == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
- etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
setEtag(w, etag)
}
- writeSuccessResponseEmpty(w)
+ writeSuccessResponseEmpty(w, r)
}
func urlPathEscape(object string) string {
@@ -115,45 +130,51 @@ func urlPathEscape(object string) string {
return strings.Join(escapedParts, "/")
}
+func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
+ destUrl := fmt.Sprintf("http://%s%s/%s%s",
+ s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object))
+ return destUrl
+}
+
func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
if strings.HasSuffix(r.URL.Path, "/") {
- s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
}
- destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
-
- s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
+ destUrl := s3a.toFilerUrl(bucket, object)
+ s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
}
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
-
- destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object)
- s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
+ destUrl := s3a.toFilerUrl(bucket, object)
+ s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)
}
func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
- destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
- s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
+ destUrl := s3a.toFilerUrl(bucket, object)
- s3a.proxyToFiler(w, r, destUrl, func(proxyResponse *http.Response, w http.ResponseWriter) {
+ s3a.proxyToFiler(w, r, destUrl, true, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) {
+ statusCode = http.StatusNoContent
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
- w.WriteHeader(http.StatusNoContent)
+ w.WriteHeader(statusCode)
+ return statusCode
})
}
@@ -191,30 +212,39 @@ type DeleteObjectsResponse struct {
// DeleteMultipleObjectsHandler - Delete multiple objects
func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
- deleteXMLBytes, err := ioutil.ReadAll(r.Body)
+ deleteXMLBytes, err := io.ReadAll(r.Body)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
deleteObjects := &DeleteObjectsRequest{}
if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedXML, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
+ return
+ }
+
+ if len(deleteObjects.Objects) > deleteMultipleObjectsLimmit {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects)
return
}
var deletedObjects []ObjectIdentifier
var deleteErrors []DeleteError
+ var auditLog *s3err.AccessLog
directoriesWithDeletion := make(map[string]int)
- s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if s3err.Logger != nil {
+ auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
+ }
+ s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// delete file entries
for _, object := range deleteObjects.Objects {
-
lastSeparator := strings.LastIndex(object.ObjectName, "/")
parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
@@ -237,6 +267,10 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
Key: object.ObjectName,
})
}
+ if auditLog != nil {
+ auditLog.Key = entryName
+ s3err.PostAccessLog(*auditLog)
+ }
}
// purge empty folders, only checking folders with deletions
@@ -253,7 +287,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
deleteResp.Errors = deleteErrors
- writeSuccessResponseXML(w, deleteResp)
+ writeSuccessResponseXML(w, r, deleteResp)
}
@@ -262,8 +296,8 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli
for dir, _ := range directoriesWithDeletion {
allDirs = append(allDirs, dir)
}
- sort.Slice(allDirs, func(i, j int) bool {
- return len(allDirs[i]) > len(allDirs[j])
+ slices.SortFunc(allDirs, func(a, b string) bool {
+ return len(a) > len(b)
})
newDirectoriesWithDeletion = make(map[string]int)
for _, dir := range allDirs {
@@ -280,87 +314,81 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli
return
}
-var passThroughHeaders = []string{
- "response-cache-control",
- "response-content-disposition",
- "response-content-encoding",
- "response-content-language",
- "response-content-type",
- "response-expires",
-}
-
-func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter)) {
+func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) {
- glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
+ glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
if err != nil {
glog.Errorf("NewRequest %s: %v", destUrl, err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- proxyReq.Header.Set("Host", s3a.option.Filer)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
-
- for header, values := range r.Header {
- // handle s3 related headers
- passed := false
- for _, h := range passThroughHeaders {
- if strings.ToLower(header) == h && len(values) > 0 {
- proxyReq.Header.Add(header[len("response-"):], values[0])
- passed = true
- break
- }
- }
- if passed {
- continue
- }
- // handle other headers
- for _, value := range values {
- proxyReq.Header.Add(header, value)
+ for k, v := range r.URL.Query() {
+ if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
+ proxyReq.Header[k] = v
}
}
+ for header, values := range r.Header {
+ proxyReq.Header[header] = values
+ }
- resp, postErr := client.Do(proxyReq)
+ // ensure that the Authorization header is overriding any previous
+ // Authorization header which might be already present in proxyReq
+ s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
+ resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
defer util.CloseResponse(resp)
if resp.StatusCode == http.StatusPreconditionFailed {
- s3err.WriteErrorResponse(w, s3err.ErrPreconditionFailed, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
+ return
+ }
+
+ if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
}
if (resp.ContentLength == -1 || resp.StatusCode == 404) && resp.StatusCode != 304 {
if r.Method != "DELETE" {
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
- responseFn(resp, w)
-
+ responseStatusCode := responseFn(resp, w)
+ s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
}
-func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) {
+func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) {
for k, v := range proxyResponse.Header {
w.Header()[k] = v
}
if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 {
w.WriteHeader(http.StatusPartialContent)
+ statusCode = http.StatusPartialContent
} else {
- w.WriteHeader(proxyResponse.StatusCode)
+ statusCode = proxyResponse.StatusCode
+ }
+ w.WriteHeader(statusCode)
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ if n, err := io.CopyBuffer(w, proxyResponse.Body, buf); err != nil {
+ glog.V(1).Infof("passthrough response read %d bytes: %v", n, err)
}
- io.Copy(w, proxyResponse.Body)
+ return statusCode
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code s3err.ErrorCode) {
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string) (etag string, code s3err.ErrorCode) {
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
@@ -372,16 +400,20 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
return "", s3err.ErrInternalError
}
- proxyReq.Header.Set("Host", s3a.option.Filer)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+ if destination != "" {
+ proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
+ }
for header, values := range r.Header {
for _, value := range values {
proxyReq.Header.Add(header, value)
}
}
-
- resp, postErr := client.Do(proxyReq)
+ // ensure that the Authorization header is overriding any previous
+ // Authorization header which might be already present in proxyReq
+ s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
+ resp, postErr := s3a.client.Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
@@ -391,7 +423,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
etag = fmt.Sprintf("%x", hash.Sum(nil))
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
+ resp_body, ra_err := io.ReadAll(resp.Body)
if ra_err != nil {
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
return etag, s3err.ErrInternalError
@@ -420,20 +452,33 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
-func getBucketAndObject(r *http.Request) (bucket, object string) {
- vars := mux.Vars(r)
- bucket = vars["bucket"]
- object = vars["object"]
- if !strings.HasPrefix(object, "/") {
- object = "/" + object
+func filerErrorToS3Error(errString string) s3err.ErrorCode {
+ switch {
+ case strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory"):
+ return s3err.ErrExistingObjectIsDirectory
+ case strings.HasSuffix(errString, "is a file"):
+ return s3err.ErrExistingObjectIsFile
+ default:
+ return s3err.ErrInternalError
}
+}
- return
+func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
+ encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
+
+ if encodedJwt == "" {
+ return
+ }
+
+ r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
}
-func filerErrorToS3Error(errString string) s3err.ErrorCode {
- if strings.HasPrefix(errString, "existing ") && strings.HasSuffix(errString, "is a directory") {
- return s3err.ErrExistingObjectIsDirectory
+func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
+ var encodedJwt security.EncodedJwt
+ if isWrite {
+ encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
+ } else {
+ encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
}
- return s3err.ErrInternalError
+ return string(encodedJwt)
}
diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go
index e1125689f..5704fcf38 100644
--- a/weed/s3api/s3api_object_handlers_postpolicy.go
+++ b/weed/s3api/s3api_object_handlers_postpolicy.go
@@ -5,16 +5,17 @@ import (
"encoding/base64"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/s3api/policy"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- "github.com/dustin/go-humanize"
- "github.com/gorilla/mux"
"io"
- "io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/policy"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/dustin/go-humanize"
+ "github.com/gorilla/mux"
)
func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.Request) {
@@ -24,25 +25,27 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
bucket := mux.Vars(r)["bucket"]
+ glog.V(3).Infof("PostPolicyBucketHandler %s", bucket)
+
reader, err := r.MultipartReader()
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest)
return
}
form, err := reader.ReadForm(int64(5 * humanize.MiByte))
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest)
return
}
defer form.RemoveAll()
fileBody, fileName, fileSize, formValues, err := extractPostPolicyFormValues(form)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest)
return
}
if fileBody == nil {
- s3err.WriteErrorResponse(w, s3err.ErrPOSTFileRequired, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrPOSTFileRequired)
return
}
defer fileBody.Close()
@@ -60,7 +63,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
if successRedirect != "" {
redirectURL, err = url.Parse(successRedirect)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest)
return
}
}
@@ -68,13 +71,13 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
// Verify policy signature.
errCode := s3a.iam.doesPolicySignatureMatch(formValues)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
policyBytes, err := base64.StdEncoding.DecodeString(formValues.Get("Policy"))
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrMalformedPOSTRequest, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedPOSTRequest)
return
}
@@ -83,7 +86,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
postPolicyForm, err := policy.ParsePostPolicyForm(string(policyBytes))
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrPostPolicyConditionInvalidFormat, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrPostPolicyConditionInvalidFormat)
return
}
@@ -99,23 +102,23 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
lengthRange := postPolicyForm.Conditions.ContentLengthRange
if lengthRange.Valid {
if fileSize < lengthRange.Min {
- s3err.WriteErrorResponse(w, s3err.ErrEntityTooSmall, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooSmall)
return
}
if fileSize > lengthRange.Max {
- s3err.WriteErrorResponse(w, s3err.ErrEntityTooLarge, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrEntityTooLarge)
return
}
}
}
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer, s3a.option.BucketsPath, bucket, urlPathEscape(object))
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlPathEscape(object))
- etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody)
+ etag, errCode := s3a.putToFiler(r, uploadUrl, fileBody, "")
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
@@ -123,7 +126,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
// Replace raw query params..
redirectURL.RawQuery = getRedirectPostRawQuery(bucket, object, etag)
w.Header().Set("Location", redirectURL.String())
- s3err.WriteEmptyResponse(w, http.StatusSeeOther)
+ s3err.WriteEmptyResponse(w, r, http.StatusSeeOther)
return
}
@@ -138,18 +141,19 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
ETag: `"` + etag + `"`,
Location: w.Header().Get("Location"),
}
- s3err.WriteXMLResponse(w, http.StatusCreated, resp)
+ s3err.WriteXMLResponse(w, r, http.StatusCreated, resp)
+ s3err.PostLog(r, http.StatusCreated, s3err.ErrNone)
case "200":
- s3err.WriteEmptyResponse(w, http.StatusOK)
+ s3err.WriteEmptyResponse(w, r, http.StatusOK)
default:
- writeSuccessResponseEmpty(w)
+ writeSuccessResponseEmpty(w, r)
}
}
// Extract form fields and file data from a HTTP POST Policy
func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser, fileName string, fileSize int64, formValues http.Header, err error) {
- /// HTML Form values
+ // / HTML Form values
fileName = ""
// Canonicalize the form values into http.Header.
@@ -172,7 +176,7 @@ func extractPostPolicyFormValues(form *multipart.Form) (filePart io.ReadCloser,
b.WriteString(v)
}
fileSize = int64(b.Len())
- filePart = ioutil.NopCloser(b)
+ filePart = io.NopCloser(b)
return filePart, fileName, fileSize, formValues, nil
}
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index 308d9a5f5..768f4d180 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -1,15 +1,20 @@
package s3api
import (
+ "crypto/sha1"
+ "encoding/xml"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "io"
"net/http"
"net/url"
"strconv"
"strings"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
)
@@ -23,7 +28,7 @@ const (
// NewMultipartUploadHandler - New multipart upload.
func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
createMultipartUploadInput := &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucket),
@@ -36,49 +41,71 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
createMultipartUploadInput.Metadata[k] = aws.String(string(v))
}
+ contentType := r.Header.Get("Content-Type")
+ if contentType != "" {
+ createMultipartUploadInput.ContentType = &contentType
+ }
response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
// CompleteMultipartUploadHandler - Completes multipart upload.
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
+
+ bucket, object := s3_constants.GetBucketAndObject(r)
+
+ parts := &CompleteMultipartUpload{}
+ if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
+ return
+ }
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
+ err := s3a.checkUploadId(object, uploadID)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
+ return
+ }
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket),
Key: objectKey(aws.String(object)),
UploadId: aws.String(uploadID),
- })
+ }, parts)
glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
// AbortMultipartUploadHandler - Aborts multipart upload.
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
+ err := s3a.checkUploadId(object, uploadID)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
+ return
+ }
response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket),
@@ -87,29 +114,31 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
})
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
- writeSuccessResponseXML(w, response)
+ //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
+ s3err.WriteXMLResponse(w, r, http.StatusNoContent, response)
+ s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
}
// ListMultipartUploadsHandler - Lists multipart uploads.
func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
if maxUploads < 0 {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxUploads, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
return
}
if keyMarker != "" {
// Marker not common with prefix is not implemented.
if !strings.HasPrefix(keyMarker, prefix) {
- s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
}
}
@@ -124,29 +153,35 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
UploadIdMarker: aws.String(uploadIDMarker),
})
- glog.V(2).Info("ListMultipartUploadsHandler", string(s3err.EncodeXMLResponse(response)), errCode)
+ glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
// TODO handle encodingType
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
// ListObjectPartsHandler - Lists object parts in a multipart upload.
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
if partNumberMarker < 0 {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidPartNumberMarker, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
return
}
if maxParts < 0 {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
+ return
+ }
+
+ err := s3a.checkUploadId(object, uploadID)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
@@ -158,36 +193,36 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
UploadId: aws.String(uploadID),
})
- glog.V(2).Info("ListObjectPartsHandler", string(s3err.EncodeXMLResponse(response)), errCode)
-
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
- writeSuccessResponseXML(w, response)
+ glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
+
+ writeSuccessResponseXML(w, r, response)
}
// PutObjectPartHandler - Put an object part in a multipart upload.
func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
- bucket, _ := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
uploadID := r.URL.Query().Get("uploadId")
- exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
- if !exists {
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchUpload, r)
+ err := s3a.checkUploadId(object, uploadID)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
return
}
partIDString := r.URL.Query().Get("partNumber")
partID, err := strconv.Atoi(partIDString)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidPart, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
return
}
if partID > globalMaxPartID {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxParts, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
return
}
@@ -204,25 +239,31 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
_, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
}
if s3ErrCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, s3ErrCode, r)
+ s3err.WriteErrorResponse(w, r, s3ErrCode)
return
}
}
defer dataReader.Close()
+ glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
+
uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
- s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID, bucket)
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, bucket)
- etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
+ if partID == 1 && r.Header.Get("Content-Type") == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
+ destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination)
if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, errCode, r)
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
setEtag(w, etag)
- writeSuccessResponseEmpty(w)
+ writeSuccessResponseEmpty(w, r)
}
@@ -230,6 +271,27 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket)
}
+// Generate uploadID hash string from object
+func (s3a *S3ApiServer) generateUploadID(object string) string {
+ if strings.HasPrefix(object, "/") {
+ object = object[1:]
+ }
+ h := sha1.New()
+ h.Write([]byte(object))
+ return fmt.Sprintf("%x", h.Sum(nil))
+}
+
+//Check object name and uploadID when processing multipart uploading
+func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
+
+ hash := s3a.generateUploadID(object)
+ if hash != id {
+ glog.Errorf("object %s and uploadID %s are not matched", object, id)
+ return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
+ }
+ return nil
+}
+
// Parse bucket url queries for ?uploads
func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
prefix = values.Get("prefix")
@@ -258,8 +320,24 @@ func getObjectResources(values url.Values) (uploadID string, partNumberMarker, m
return
}
-type byCompletedPartNumber []*s3.CompletedPart
+func xmlDecoder(body io.Reader, v interface{}, size int64) error {
+ var lbody io.Reader
+ if size > 0 {
+ lbody = io.LimitReader(body, size)
+ } else {
+ lbody = body
+ }
+ d := xml.NewDecoder(lbody)
+ d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
+ return input, nil
+ }
+ return d.Decode(v)
+}
-func (a byCompletedPartNumber) Len() int { return len(a) }
-func (a byCompletedPartNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a byCompletedPartNumber) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
+type CompleteMultipartUpload struct {
+ Parts []CompletedPart `xml:"Part"`
+}
+type CompletedPart struct {
+ ETag string
+ PartNumber int
+}
diff --git a/weed/s3api/s3api_object_skip_handlers.go b/weed/s3api/s3api_object_skip_handlers.go
new file mode 100644
index 000000000..160d02475
--- /dev/null
+++ b/weed/s3api/s3api_object_skip_handlers.go
@@ -0,0 +1,45 @@
+package s3api
+
+import (
+ "net/http"
+)
+
+// GetObjectAclHandler Put object ACL
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectAcl.html
+func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Request) {
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// PutObjectAclHandler Put object ACL
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectAcl.html
+func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Request) {
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// PutObjectRetentionHandler Put object Retention
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectRetention.html
+func (s3a *S3ApiServer) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// PutObjectLegalHoldHandler Put object Legal Hold
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLegalHold.html
+func (s3a *S3ApiServer) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
+
+// PutObjectLockConfigurationHandler Put object Lock configuration
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectLockConfiguration.html
+func (s3a *S3ApiServer) PutObjectLockConfigurationHandler(w http.ResponseWriter, r *http.Request) {
+
+ w.WriteHeader(http.StatusNoContent)
+
+}
diff --git a/weed/s3api/s3api_object_tagging_handlers.go b/weed/s3api/s3api_object_tagging_handlers.go
index fd3ec2ff7..9fde0309c 100644
--- a/weed/s3api/s3api_object_tagging_handlers.go
+++ b/weed/s3api/s3api_object_tagging_handlers.go
@@ -3,20 +3,22 @@ package s3api
import (
"encoding/xml"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "io"
+ "net/http"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"github.com/chrislusf/seaweedfs/weed/util"
- "io"
- "io/ioutil"
- "net/http"
)
// GetObjectTaggingHandler - GET object tagging
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObjectTagging.html
func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("GetObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName()
@@ -25,15 +27,15 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R
if err != nil {
if err == filer_pb.ErrNotFound {
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
} else {
glog.Errorf("GetObjectTaggingHandler %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
return
}
- writeSuccessResponseXML(w, FromTags(tags))
+ writeSuccessResponseXML(w, r, FromTags(tags))
}
@@ -41,38 +43,39 @@ func (s3a *S3ApiServer) GetObjectTaggingHandler(w http.ResponseWriter, r *http.R
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObjectTagging.html
func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("PutObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName()
tagging := &Tagging{}
- input, err := ioutil.ReadAll(io.LimitReader(r.Body, r.ContentLength))
+ input, err := io.ReadAll(io.LimitReader(r.Body, r.ContentLength))
if err != nil {
glog.Errorf("PutObjectTaggingHandler read input %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if err = xml.Unmarshal(input, tagging); err != nil {
glog.Errorf("PutObjectTaggingHandler Unmarshal %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrMalformedXML, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
return
}
tags := tagging.ToTags()
if len(tags) > 10 {
glog.Errorf("PutObjectTaggingHandler tags %s: %d tags more than 10", r.URL, len(tags))
- s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
return
}
for k, v := range tags {
if len(k) > 128 {
glog.Errorf("PutObjectTaggingHandler tags %s: tag key %s longer than 128", r.URL, k)
- s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
return
}
if len(v) > 256 {
glog.Errorf("PutObjectTaggingHandler tags %s: tag value %s longer than 256", r.URL, v)
- s3err.WriteErrorResponse(w, s3err.ErrInvalidTag, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
return
}
}
@@ -80,23 +83,24 @@ func (s3a *S3ApiServer) PutObjectTaggingHandler(w http.ResponseWriter, r *http.R
if err = s3a.setTags(dir, name, tagging.ToTags()); err != nil {
if err == filer_pb.ErrNotFound {
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
} else {
glog.Errorf("PutObjectTaggingHandler setTags %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
return
}
- w.WriteHeader(http.StatusNoContent)
-
+ w.WriteHeader(http.StatusOK)
+ s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}
// DeleteObjectTaggingHandler Delete object tagging
// API reference: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjectTagging.html
func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
- bucket, object := getBucketAndObject(r)
+ bucket, object := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("DeleteObjectTaggingHandler %s %s", bucket, object)
target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
dir, name := target.DirAndName()
@@ -105,13 +109,14 @@ func (s3a *S3ApiServer) DeleteObjectTaggingHandler(w http.ResponseWriter, r *htt
if err != nil {
if err == filer_pb.ErrNotFound {
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchKey, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
} else {
glog.Errorf("DeleteObjectTaggingHandler %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
return
}
w.WriteHeader(http.StatusNoContent)
+ s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
}
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 51a58af6a..6b934bccd 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -5,6 +5,7 @@ import (
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"io"
"net/http"
"net/url"
@@ -15,7 +16,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
)
@@ -39,16 +39,17 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
// https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
// collect parameters
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("ListObjectsV2Handler %s", bucket)
originalPrefix, continuationToken, startAfter, delimiter, _, maxKeys := getListObjectsV2Args(r.URL.Query())
if maxKeys < 0 {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxKeys, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
return
}
if delimiter != "" && delimiter != "/" {
- s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
}
@@ -60,13 +61,13 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if len(response.Contents) == 0 {
if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchBucket, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
}
@@ -86,7 +87,7 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
StartAfter: startAfter,
}
- writeSuccessResponseXML(w, responseV2)
+ writeSuccessResponseXML(w, r, responseV2)
}
func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
@@ -94,34 +95,35 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
// collect parameters
- bucket, _ := getBucketAndObject(r)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("ListObjectsV1Handler %s", bucket)
originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
if maxKeys < 0 {
- s3err.WriteErrorResponse(w, s3err.ErrInvalidMaxKeys, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
return
}
if delimiter != "" && delimiter != "/" {
- s3err.WriteErrorResponse(w, s3err.ErrNotImplemented, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
return
}
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
if err != nil {
- s3err.WriteErrorResponse(w, s3err.ErrInternalError, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if len(response.Contents) == 0 {
if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
- s3err.WriteErrorResponse(w, s3err.ErrNoSuchBucket, r)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
}
- writeSuccessResponseXML(w, response)
+ writeSuccessResponseXML(w, r, response)
}
func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
@@ -131,10 +133,10 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
reqDir = reqDir[1:]
}
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
+ bucketPrefixLen := len(bucketPrefix)
reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
if strings.HasSuffix(reqDir, "/") {
- // remove trailing "/"
- reqDir = reqDir[:len(reqDir)-1]
+ reqDir = strings.TrimSuffix(reqDir, "/")
}
var contents []ListEntry
@@ -144,33 +146,36 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
var nextMarker string
// check filer
- err = s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, func(dir string, entry *filer_pb.Entry) {
+ _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, false, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) {
if entry.IsDirectory {
if delimiter == "/" {
commonPrefixes = append(commonPrefixes, PrefixEntry{
- Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
+ Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:],
})
}
- } else {
- storageClass := "STANDARD"
- if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok {
- storageClass = string(v)
+ if !(entry.IsDirectoryKeyObject() && strings.HasSuffix(entry.Name, "/")) {
+ return
}
- contents = append(contents, ListEntry{
- Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
- LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
- ETag: "\"" + filer.ETag(entry) + "\"",
- Size: int64(filer.FileSize(entry)),
- Owner: CanonicalUser{
- ID: fmt.Sprintf("%x", entry.Attributes.Uid),
- DisplayName: entry.Attributes.UserName,
- },
- StorageClass: StorageClass(storageClass),
- })
}
+ storageClass := "STANDARD"
+ if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
+ storageClass = string(v)
+ }
+ contents = append(contents, ListEntry{
+ Key: fmt.Sprintf("%s/%s", dir, entry.Name)[bucketPrefixLen:],
+ LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
+ ETag: "\"" + filer.ETag(entry) + "\"",
+ Size: int64(filer.FileSize(entry)),
+ Owner: CanonicalUser{
+ ID: fmt.Sprintf("%x", entry.Attributes.Uid),
+ DisplayName: entry.Attributes.UserName,
+ },
+ StorageClass: StorageClass(storageClass),
+ })
})
+ glog.V(4).Infof("end doListFilerEntries isTruncated:%v nextMarker:%v reqDir: %v prefix: %v", isTruncated, nextMarker, reqDir, prefix)
if doErr != nil {
return doErr
}
@@ -179,6 +184,39 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
nextMarker = ""
}
+ if len(contents) == 0 && len(commonPrefixes) == 0 && maxKeys > 0 {
+ if strings.HasSuffix(originalPrefix, "/") && prefix == "" {
+ reqDir, prefix = filepath.Split(strings.TrimSuffix(reqDir, "/"))
+ reqDir = strings.TrimSuffix(reqDir, "/")
+ }
+ _, _, _, doErr = s3a.doListFilerEntries(client, reqDir, prefix, 1, prefix, delimiter, true, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) {
+ if entry.IsDirectoryKeyObject() && entry.Name == prefix {
+ storageClass := "STANDARD"
+ if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
+ storageClass = string(v)
+ }
+ contents = append(contents, ListEntry{
+ Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:],
+ LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
+ ETag: "\"" + fmt.Sprintf("%x", entry.Attributes.Md5) + "\"",
+ Size: int64(filer.FileSize(entry)),
+ Owner: CanonicalUser{
+ ID: fmt.Sprintf("%x", entry.Attributes.Uid),
+ DisplayName: entry.Attributes.UserName,
+ },
+ StorageClass: StorageClass(storageClass),
+ })
+ }
+ })
+ if doErr != nil {
+ return doErr
+ }
+ }
+
+ if len(nextMarker) > 0 {
+ nextMarker = nextMarker[bucketPrefixLen:]
+ }
+
response = ListBucketResult{
Name: bucket,
Prefix: originalPrefix,
@@ -197,7 +235,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
return
}
-func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
+func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, inclusiveStartFrom bool, subEntries bool, bucketPrefixLen int, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
// invariants
// prefix and marker should be under dir, marker may contain "/"
// maxKeys should be updated for each recursion
@@ -210,19 +248,30 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
}
if strings.Contains(marker, "/") {
+ if strings.HasSuffix(marker, "/") {
+ marker = strings.TrimSuffix(marker, "/")
+ }
sepIndex := strings.Index(marker, "/")
- subDir, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
- // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker, "maxKeys", maxKeys)
- subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", maxKeys, subMarker, delimiter, eachEntryFn)
- if subErr != nil {
- err = subErr
- return
+ if sepIndex != -1 {
+ subPrefix, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
+ subDir := fmt.Sprintf("%s/%s", dir[0:bucketPrefixLen-1], subPrefix)
+ if strings.HasPrefix(subDir, dir) {
+ subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, subDir, "", maxKeys, subMarker, delimiter, false, false, bucketPrefixLen, eachEntryFn)
+ if subErr != nil {
+ err = subErr
+ return
+ }
+ counter += subCounter
+ isTruncated = isTruncated || subIsTruncated
+ maxKeys -= subCounter
+ nextMarker = subNextMarker
+ // finished processing this sub directory
+ marker = subPrefix
+ }
}
- isTruncated = isTruncated || subIsTruncated
- maxKeys -= subCounter
- nextMarker = subDir + "/" + subNextMarker
- // finished processing this sub directory
- marker = subDir
+ }
+ if maxKeys <= 0 {
+ return
}
// now marker is also a direct child of dir
@@ -231,7 +280,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
Prefix: prefix,
Limit: uint32(maxKeys + 1),
StartFromFileName: marker,
- InclusiveStartFrom: false,
+ InclusiveStartFrom: inclusiveStartFrom,
}
ctx, cancel := context.WithCancel(context.Background())
@@ -257,39 +306,46 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
entry := resp.Entry
- nextMarker = entry.Name
+ nextMarker = dir + "/" + entry.Name
if entry.IsDirectory {
// println("ListEntries", dir, "dir:", entry.Name)
- if entry.Name != ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
- if delimiter != "/" {
+ if entry.Name == ".uploads" { // FIXME no need to apply to all directories. this extra also affects maxKeys
+ continue
+ }
+ if delimiter == "" {
+ eachEntryFn(dir, entry)
+ // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
+ subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, false, true, bucketPrefixLen, eachEntryFn)
+ if subErr != nil {
+ err = fmt.Errorf("doListFilerEntries2: %v", subErr)
+ return
+ }
+ // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
+ if subCounter == 0 && entry.IsDirectoryKeyObject() {
+ entry.Name += "/"
eachEntryFn(dir, entry)
- // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
- subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, eachEntryFn)
- if subErr != nil {
- err = fmt.Errorf("doListFilerEntries2: %v", subErr)
- return
- }
- // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
- counter += subCounter
- nextMarker = entry.Name + "/" + subNextMarker
- if subIsTruncated {
- isTruncated = true
- return
- }
- } else {
- var isEmpty bool
- if !s3a.option.AllowEmptyFolder {
- if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
- glog.Errorf("check empty folder %s: %v", dir, err)
- }
- }
- if !isEmpty {
- eachEntryFn(dir, entry)
- counter++
+ counter++
+ }
+ counter += subCounter
+ nextMarker = subNextMarker
+ if subIsTruncated {
+ isTruncated = true
+ return
+ }
+ } else if delimiter == "/" {
+ var isEmpty bool
+ if !s3a.option.AllowEmptyFolder && !entry.IsDirectoryKeyObject() {
+ if isEmpty, err = s3a.isDirectoryAllEmpty(client, dir, entry.Name); err != nil {
+ glog.Errorf("check empty folder %s: %v", dir, err)
}
}
+ if !isEmpty {
+ nextMarker += "/"
+ eachEntryFn(dir, entry)
+ counter++
+ }
}
- } else {
+ } else if !(delimiter == "/" && subEntries) {
// println("ListEntries", dir, "file:", entry.Name)
eachEntryFn(dir, entry)
counter++
diff --git a/weed/s3api/s3api_policy.go b/weed/s3api/s3api_policy.go
new file mode 100644
index 000000000..6e2c8cfa2
--- /dev/null
+++ b/weed/s3api/s3api_policy.go
@@ -0,0 +1,148 @@
+package s3api
+
+import (
+ "encoding/xml"
+ "time"
+)
+
+// Status represents lifecycle configuration status
+type ruleStatus string
+
+// Supported status types
+const (
+ Enabled ruleStatus = "Enabled"
+ Disabled ruleStatus = "Disabled"
+)
+
+// Lifecycle - Configuration for bucket lifecycle.
+type Lifecycle struct {
+ XMLName xml.Name `xml:"LifecycleConfiguration"`
+ Rules []Rule `xml:"Rule"`
+}
+
+// Rule - a rule for lifecycle configuration.
+type Rule struct {
+ XMLName xml.Name `xml:"Rule"`
+ ID string `xml:"ID,omitempty"`
+ Status ruleStatus `xml:"Status"`
+ Filter Filter `xml:"Filter,omitempty"`
+ Prefix Prefix `xml:"Prefix,omitempty"`
+ Expiration Expiration `xml:"Expiration,omitempty"`
+ Transition Transition `xml:"Transition,omitempty"`
+}
+
+// Filter - a filter for a lifecycle configuration Rule.
+type Filter struct {
+ XMLName xml.Name `xml:"Filter"`
+ set bool
+
+ Prefix Prefix
+
+ And And
+ andSet bool
+
+ Tag Tag
+ tagSet bool
+}
+
+// Prefix holds the prefix xml tag in <Rule> and <Filter>
+type Prefix struct {
+ string
+ set bool
+}
+
+// MarshalXML encodes Prefix field into an XML form.
+func (p Prefix) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error {
+ if !p.set {
+ return nil
+ }
+ return e.EncodeElement(p.string, startElement)
+}
+
+// MarshalXML encodes Filter field into an XML form.
+func (f Filter) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
+ if err := e.EncodeToken(start); err != nil {
+ return err
+ }
+ if err := e.EncodeElement(f.Prefix, xml.StartElement{Name: xml.Name{Local: "Prefix"}}); err != nil {
+ return err
+ }
+ return e.EncodeToken(xml.EndElement{Name: start.Name})
+}
+
+// And - a tag to combine a prefix and multiple tags for lifecycle configuration rule.
+type And struct {
+ XMLName xml.Name `xml:"And"`
+ Prefix Prefix `xml:"Prefix,omitempty"`
+ Tags []Tag `xml:"Tag,omitempty"`
+}
+
+// Expiration - expiration actions for a rule in lifecycle configuration.
+type Expiration struct {
+ XMLName xml.Name `xml:"Expiration"`
+ Days int `xml:"Days,omitempty"`
+ Date ExpirationDate `xml:"Date,omitempty"`
+ DeleteMarker ExpireDeleteMarker `xml:"ExpiredObjectDeleteMarker"`
+
+ set bool
+}
+
+// MarshalXML encodes expiration field into an XML form.
+func (e Expiration) MarshalXML(enc *xml.Encoder, startElement xml.StartElement) error {
+ if !e.set {
+ return nil
+ }
+ type expirationWrapper Expiration
+ return enc.EncodeElement(expirationWrapper(e), startElement)
+}
+
+// ExpireDeleteMarker represents value of ExpiredObjectDeleteMarker field in Expiration XML element.
+type ExpireDeleteMarker struct {
+ val bool
+ set bool
+}
+
+// MarshalXML encodes delete marker boolean into an XML form.
+func (b ExpireDeleteMarker) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error {
+ if !b.set {
+ return nil
+ }
+ return e.EncodeElement(b.val, startElement)
+}
+
+// ExpirationDate is a embedded type containing time.Time to unmarshal
+// Date in Expiration
+type ExpirationDate struct {
+ time.Time
+}
+
+// MarshalXML encodes expiration date if it is non-zero and encodes
+// empty string otherwise
+func (eDate ExpirationDate) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error {
+ if eDate.Time.IsZero() {
+ return nil
+ }
+ return e.EncodeElement(eDate.Format(time.RFC3339), startElement)
+}
+
+// Transition - transition actions for a rule in lifecycle configuration.
+type Transition struct {
+ XMLName xml.Name `xml:"Transition"`
+ Days int `xml:"Days,omitempty"`
+ Date time.Time `xml:"Date,omitempty"`
+ StorageClass string `xml:"StorageClass,omitempty"`
+
+ set bool
+}
+
+// MarshalXML encodes transition field into an XML form.
+func (t Transition) MarshalXML(enc *xml.Encoder, start xml.StartElement) error {
+ if !t.set {
+ return nil
+ }
+ type transitionWrapper Transition
+ return enc.EncodeElement(transitionWrapper(t), start)
+}
+
+// TransitionDays is a type alias to unmarshal Days in Transition
+type TransitionDays int
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 27259c4a7..cc5ca5231 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -1,44 +1,81 @@
package s3api
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+ "net"
"net/http"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb"
. "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"google.golang.org/grpc"
)
type S3ApiServerOption struct {
- Filer string
- Port int
- FilerGrpcAddress string
- Config string
- DomainName string
- BucketsPath string
- GrpcDialOption grpc.DialOption
- AllowEmptyFolder bool
+ Filer pb.ServerAddress
+ Port int
+ Config string
+ DomainName string
+ BucketsPath string
+ GrpcDialOption grpc.DialOption
+ AllowEmptyFolder bool
+ AllowDeleteBucketNotEmpty bool
+ LocalFilerSocket *string
}
type S3ApiServer struct {
- option *S3ApiServerOption
- iam *IdentityAccessManagement
+ s3_pb.UnimplementedSeaweedS3Server
+ option *S3ApiServerOption
+ iam *IdentityAccessManagement
+ cb *CircuitBreaker
+ randomClientId int32
+ filerGuard *security.Guard
+ client *http.Client
}
func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.filer_signing.key")
+ v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.filer_signing.read.key")
+ v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
+
s3ApiServer = &S3ApiServer{
- option: option,
- iam: NewIdentityAccessManagement(option),
+ option: option,
+ iam: NewIdentityAccessManagement(option),
+ randomClientId: util.RandomInt32(),
+ filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
+ cb: NewCircuitBreaker(option),
+ }
+ if option.LocalFilerSocket == nil || *option.LocalFilerSocket == "" {
+ s3ApiServer.client = &http.Client{Transport: &http.Transport{
+ MaxIdleConns: 1024,
+ MaxIdleConnsPerHost: 1024,
+ }}
+ } else {
+ s3ApiServer.client = &http.Client{
+ Transport: &http.Transport{
+ DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
+ return net.Dial("unix", *option.LocalFilerSocket)
+ },
+ },
+ }
}
s3ApiServer.registerRouter(router)
- go s3ApiServer.subscribeMetaEvents("s3", filer.IamConfigDirecotry+"/"+filer.IamIdentityFile, time.Now().UnixNano())
-
+ go s3ApiServer.subscribeMetaEvents("s3", filer.DirectoryEtcRoot, time.Now().UnixNano())
return s3ApiServer, nil
}
@@ -63,73 +100,126 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
for _, bucket := range routers {
- // HeadObject
- bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.HeadObjectHandler, ACTION_READ), "GET"))
- // HeadBucket
- bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.HeadBucketHandler, ACTION_ADMIN), "GET"))
+ // each case should follow the next rule:
+ // - requesting object with query must precede any other methods
+ // - requesting object must precede any methods with buckets
+ // - requesting bucket with query must precede raw methods with buckets
+ // - requesting bucket must be processed in the end
+
+ // objects with query
// CopyObjectPart
- bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
+ bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// PutObjectPart
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectPartHandler, ACTION_WRITE), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
// CompleteMultipartUpload
- bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.CompleteMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
// NewMultipartUpload
- bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.NewMultipartUploadHandler, ACTION_WRITE), "POST")).Queries("uploads", "")
+ bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
// AbortMultipartUpload
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.AbortMultipartUploadHandler, ACTION_WRITE), "DELETE")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
// ListObjectParts
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectPartsHandler, ACTION_READ), "GET")).Queries("uploadId", "{uploadId:.*}")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
// ListMultipartUploads
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListMultipartUploadsHandler, ACTION_READ), "GET")).Queries("uploads", "")
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
// GetObjectTagging
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectTaggingHandler, ACTION_READ), "GET")).Queries("tagging", "")
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")
// PutObjectTagging
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectTaggingHandler, ACTION_TAGGING), "PUT")).Queries("tagging", "")
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "")
// DeleteObjectTagging
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING), "DELETE")).Queries("tagging", "")
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
+
+ // PutObjectACL
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
+ // PutObjectRetention
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
+ // PutObjectLegalHold
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "")
+ // PutObjectLockConfiguration
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
+
+ // GetObjectACL
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
+
+ // objects with query
+
+ // raw objects
+
+ // HeadObject
+ bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET"))
+
+ // GetObject, but directory listing is not supported
+ bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET"))
// CopyObject
- bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.CopyObjectHandler, ACTION_WRITE), "COPY"))
+ bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY"))
// PutObject
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.PutObjectHandler, ACTION_WRITE), "PUT"))
- // PutBucket
- bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketHandler, ACTION_ADMIN), "PUT"))
-
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT"))
// DeleteObject
- bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteObjectHandler, ACTION_WRITE), "DELETE"))
- // DeleteBucket
- bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketHandler, ACTION_WRITE), "DELETE"))
+ bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE"))
+
+ // raw objects
+
+ // buckets with query
+
+ // DeleteMultipleObjects
+ bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
+
+ // GetBucketACL
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
+ // PutBucketACL
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
+
+ // GetBucketPolicy
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")
+ // PutBucketPolicy
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "")
+ // DeleteBucketPolicy
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "")
+
+ // GetBucketCors
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "")
+ // PutBucketCors
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "")
+ // DeleteBucketCors
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "")
+
+ // GetBucketLifecycleConfiguration
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "")
+ // PutBucketLifecycleConfiguration
+ bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "")
+ // DeleteBucketLifecycleConfiguration
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "")
+
+ // GetBucketLocation
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "")
+
+ // GetBucketRequestPayment
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "")
// ListObjectsV2
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV2Handler, ACTION_LIST), "LIST")).Queries("list-type", "2")
- // GetObject, but directory listing is not supported
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.GetObjectHandler, ACTION_READ), "GET"))
- // ListObjectsV1 (Legacy)
- bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.ListObjectsV1Handler, ACTION_LIST), "LIST"))
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
+
+ // buckets with query
+
+ // raw buckets
// PostPolicy
- bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.PostPolicyBucketHandler, ACTION_WRITE), "POST"))
+ bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST"))
- // DeleteMultipleObjects
- bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE), "DELETE")).Queries("delete", "")
- /*
-
- // not implemented
- // GetBucketLocation
- bucket.Methods("GET").HandlerFunc(s3a.GetBucketLocationHandler).Queries("location", "")
- // GetBucketPolicy
- bucket.Methods("GET").HandlerFunc(s3a.GetBucketPolicyHandler).Queries("policy", "")
- // GetObjectACL
- bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(s3a.GetObjectACLHandler).Queries("acl", "")
- // GetBucketACL
- bucket.Methods("GET").HandlerFunc(s3a.GetBucketACLHandler).Queries("acl", "")
- // PutBucketPolicy
- bucket.Methods("PUT").HandlerFunc(s3a.PutBucketPolicyHandler).Queries("policy", "")
- // DeleteBucketPolicy
- bucket.Methods("DELETE").HandlerFunc(s3a.DeleteBucketPolicyHandler).Queries("policy", "")
- */
+ // HeadBucket
+ bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
+
+ // PutBucket
+ bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT"))
+ // DeleteBucket
+ bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE"))
+
+ // ListObjectsV1 (Legacy)
+ bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
+
+ // raw buckets
}
diff --git a/weed/s3api/s3api_server_grpc.go b/weed/s3api/s3api_server_grpc.go
new file mode 100644
index 000000000..e93d0056f
--- /dev/null
+++ b/weed/s3api/s3api_server_grpc.go
@@ -0,0 +1,16 @@
+package s3api
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/pb/s3_pb"
+)
+
+func (s3a *S3ApiServer) Configure(ctx context.Context, request *s3_pb.S3ConfigureRequest) (*s3_pb.S3ConfigureResponse, error) {
+
+ if err := s3a.iam.LoadS3ApiConfigurationFromBytes(request.S3ConfigurationFileContent); err != nil {
+ return nil, err
+ }
+
+ return &s3_pb.S3ConfigureResponse{}, nil
+
+}
diff --git a/weed/s3api/s3api_status_handlers.go b/weed/s3api/s3api_status_handlers.go
index 914c27f40..fafb6ac2f 100644
--- a/weed/s3api/s3api_status_handlers.go
+++ b/weed/s3api/s3api_status_handlers.go
@@ -1,8 +1,11 @@
package s3api
-import "net/http"
+import (
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "net/http"
+)
func (s3a *S3ApiServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
// write out the response code and content type header
- writeSuccessResponseEmpty(w)
+ s3err.WriteResponse(w, r, http.StatusOK, []byte{}, "")
}
diff --git a/weed/s3api/s3api_xsd_generated.go b/weed/s3api/s3api_xsd_generated.go
index 9d62afc4e..dd6a32ff2 100644
--- a/weed/s3api/s3api_xsd_generated.go
+++ b/weed/s3api/s3api_xsd_generated.go
@@ -8,12 +8,12 @@ import (
)
type AccessControlList struct {
- Grant []Grant `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Grant,omitempty"`
+ Grant []Grant `xml:"Grant,omitempty"`
}
type AccessControlPolicy struct {
- Owner CanonicalUser `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Owner"`
- AccessControlList AccessControlList `xml:"http://s3.amazonaws.com/doc/2006-03-01/ AccessControlList"`
+ Owner CanonicalUser `xml:"Owner"`
+ AccessControlList AccessControlList `xml:"AccessControlList"`
}
type AmazonCustomerByEmail struct {
@@ -467,11 +467,17 @@ func (t *GetObjectResult) UnmarshalXML(d *xml.Decoder, start xml.StartElement) e
}
type Grant struct {
- Grantee Grantee `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Grantee"`
- Permission Permission `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Permission"`
+ Grantee Grantee `xml:"Grantee"`
+ Permission Permission `xml:"Permission"`
}
type Grantee struct {
+ XMLNS string `xml:"xmlns:xsi,attr"`
+ XMLXSI string `xml:"xsi:type,attr"`
+ Type string `xml:"Type"`
+ ID string `xml:"ID,omitempty"`
+ DisplayName string `xml:"DisplayName,omitempty"`
+ URI string `xml:"URI,omitempty"`
}
type Group struct {
@@ -640,6 +646,10 @@ type ListVersionsResult struct {
CommonPrefixes []PrefixEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CommonPrefixes,omitempty"`
}
+type LocationConstraint struct {
+ LocationConstraint string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LocationConstraint"`
+}
+
type LoggingSettings struct {
TargetBucket string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ TargetBucket"`
TargetPrefix string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ TargetPrefix"`
diff --git a/weed/s3api/s3err/audit_fluent.go b/weed/s3api/s3err/audit_fluent.go
new file mode 100644
index 000000000..2deb56896
--- /dev/null
+++ b/weed/s3api/s3err/audit_fluent.go
@@ -0,0 +1,183 @@
+package s3err
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/fluent/fluent-logger-golang/fluent"
+ "net/http"
+ "os"
+ "time"
+)
+
+type AccessLogExtend struct {
+ AccessLog
+ AccessLogHTTP
+}
+
+type AccessLog struct {
+ Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1
+ Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000]
+ RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3
+ Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id
+ RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07
+ Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
+ Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg
+ ErrorCode string `msg:"error_code" json:"error_code,omitempty"`
+ HostId string `msg:"host_id" json:"host_id,omitempty"`
+ HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
+ UserAgent string `msg:"user_agent" json:"user_agent,omitempty"`
+ HTTPStatus int `msg:"status" json:"status,omitempty"`
+ SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"`
+}
+
+type AccessLogHTTP struct {
+ RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
+ BytesSent string `json:"bytes_sent,omitempty"`
+ ObjectSize string `json:"object_size,omitempty"`
+ TotalTime int `json:"total_time,omitempty"`
+ TurnAroundTime int `json:"turn_around_time,omitempty"`
+ Referer string `json:"Referer,omitempty"`
+ VersionId string `json:"version_id,omitempty"`
+ CipherSuite string `json:"cipher_suite,omitempty"`
+ AuthenticationType string `json:"auth_type,omitempty"`
+ TLSVersion string `json:"TLS_version,omitempty"`
+}
+
+const tag = "s3.access"
+
+var (
+ Logger *fluent.Fluent
+ hostname = os.Getenv("HOSTNAME")
+ environment = os.Getenv("ENVIRONMENT")
+)
+
+func InitAuditLog(config string) {
+ configContent, readErr := os.ReadFile(config)
+ if readErr != nil {
+ glog.Errorf("fail to read fluent config %s : %v", config, readErr)
+ return
+ }
+ fluentConfig := &fluent.Config{}
+ if err := json.Unmarshal(configContent, fluentConfig); err != nil {
+ glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err)
+ return
+ }
+ if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 {
+ fluentConfig.TagPrefix = environment
+ }
+ fluentConfig.Async = true
+ fluentConfig.AsyncResultCallback = func(data []byte, err error) {
+ if err != nil {
+ glog.Warning("Error while posting log: ", err)
+ }
+ }
+ var err error
+ Logger, err = fluent.New(*fluentConfig)
+ if err != nil {
+ glog.Errorf("fail to load fluent config: %v", err)
+ }
+}
+
+func getREST(httpMetod string, resourceType string) string {
+ return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType)
+}
+
+func getResourceType(object string, query_key string, metod string) (string, bool) {
+ if object == "/" {
+ switch query_key {
+ case "delete":
+ return "BATCH.DELETE.OBJECT", true
+ case "tagging":
+ return getREST(metod, "OBJECTTAGGING"), true
+ case "lifecycle":
+ return getREST(metod, "LIFECYCLECONFIGURATION"), true
+ case "acl":
+ return getREST(metod, "ACCESSCONTROLPOLICY"), true
+ case "policy":
+ return getREST(metod, "BUCKETPOLICY"), true
+ default:
+ return getREST(metod, "BUCKET"), false
+ }
+ } else {
+ switch query_key {
+ case "tagging":
+ return getREST(metod, "OBJECTTAGGING"), true
+ default:
+ return getREST(metod, "OBJECT"), false
+ }
+ }
+}
+
+func getOperation(object string, r *http.Request) string {
+ queries := r.URL.Query()
+ var operation string
+ var queryFound bool
+ for key, _ := range queries {
+ operation, queryFound = getResourceType(object, key, r.Method)
+ if queryFound {
+ return operation
+ }
+ }
+ if len(queries) == 0 {
+ operation, _ = getResourceType(object, "", r.Method)
+ }
+ return operation
+}
+
+func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP {
+ return AccessLogHTTP{
+ RequestURI: r.RequestURI,
+ Referer: r.Header.Get("Referer"),
+ }
+}
+
+func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog {
+ bucket, key := s3_constants.GetBucketAndObject(r)
+ var errorCode string
+ if s3errCode != ErrNone {
+ errorCode = GetAPIError(s3errCode).Code
+ }
+ remoteIP := r.Header.Get("X-Real-IP")
+ if len(remoteIP) == 0 {
+ remoteIP = r.RemoteAddr
+ }
+ hostHeader := r.Header.Get("X-Forwarded-Host")
+ if len(hostHeader) == 0 {
+ hostHeader = r.Host
+ }
+ return &AccessLog{
+ HostHeader: hostHeader,
+ RequestID: r.Header.Get("X-Request-ID"),
+ RemoteIP: remoteIP,
+ Requester: r.Header.Get(s3_constants.AmzIdentityId),
+ SignatureVersion: r.Header.Get(s3_constants.AmzAuthType),
+ UserAgent: r.Header.Get("user-agent"),
+ HostId: hostname,
+ Bucket: bucket,
+ HTTPStatus: HTTPStatusCode,
+ Time: time.Now().Unix(),
+ Key: key,
+ Operation: getOperation(key, r),
+ ErrorCode: errorCode,
+ }
+}
+
+func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) {
+ if Logger == nil {
+ return
+ }
+ if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil {
+ glog.Warning("Error while posting log: ", err)
+ }
+}
+
+func PostAccessLog(log AccessLog) {
+ if Logger == nil || len(log.Key) == 0 {
+ return
+ }
+ if err := Logger.Post(tag, log); err != nil {
+ glog.Warning("Error while posting log: ", err)
+ }
+}
diff --git a/weed/s3api/s3err/error_handler.go b/weed/s3api/s3err/error_handler.go
index c1065fffc..6753a1641 100644
--- a/weed/s3api/s3err/error_handler.go
+++ b/weed/s3api/s3err/error_handler.go
@@ -19,15 +19,16 @@ const (
MimeXML mimeType = "application/xml"
)
-func WriteXMLResponse(w http.ResponseWriter, statusCode int, response interface{}) {
- WriteResponse(w, statusCode, EncodeXMLResponse(response), MimeXML)
+func WriteXMLResponse(w http.ResponseWriter, r *http.Request, statusCode int, response interface{}) {
+ WriteResponse(w, r, statusCode, EncodeXMLResponse(response), MimeXML)
}
-func WriteEmptyResponse(w http.ResponseWriter, statusCode int) {
- WriteResponse(w, statusCode, []byte{}, mimeNone)
+func WriteEmptyResponse(w http.ResponseWriter, r *http.Request, statusCode int) {
+ WriteResponse(w, r, statusCode, []byte{}, mimeNone)
+ PostLog(r, statusCode, ErrNone)
}
-func WriteErrorResponse(w http.ResponseWriter, errorCode ErrorCode, r *http.Request) {
+func WriteErrorResponse(w http.ResponseWriter, r *http.Request, errorCode ErrorCode) {
vars := mux.Vars(r)
bucket := vars["bucket"]
object := vars["object"]
@@ -38,7 +39,8 @@ func WriteErrorResponse(w http.ResponseWriter, errorCode ErrorCode, r *http.Requ
apiError := GetAPIError(errorCode)
errorResponse := getRESTErrorResponse(apiError, r.URL.Path, bucket, object)
encodedErrorResponse := EncodeXMLResponse(errorResponse)
- WriteResponse(w, apiError.HTTPStatusCode, encodedErrorResponse, MimeXML)
+ WriteResponse(w, r, apiError.HTTPStatusCode, encodedErrorResponse, MimeXML)
+ PostLog(r, apiError.HTTPStatusCode, errorCode)
}
func getRESTErrorResponse(err APIError, resource string, bucket, object string) RESTErrorResponse {
@@ -61,13 +63,17 @@ func EncodeXMLResponse(response interface{}) []byte {
return bytesBuffer.Bytes()
}
-func setCommonHeaders(w http.ResponseWriter) {
+func setCommonHeaders(w http.ResponseWriter, r *http.Request) {
w.Header().Set("x-amz-request-id", fmt.Sprintf("%d", time.Now().UnixNano()))
w.Header().Set("Accept-Ranges", "bytes")
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
}
-func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) {
- setCommonHeaders(w)
+func WriteResponse(w http.ResponseWriter, r *http.Request, statusCode int, response []byte, mType mimeType) {
+ setCommonHeaders(w, r)
if response != nil {
w.Header().Set("Content-Length", strconv.Itoa(len(response)))
}
@@ -88,5 +94,5 @@ func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType
// If none of the http routes match respond with MethodNotAllowed
func NotFoundHandler(w http.ResponseWriter, r *http.Request) {
glog.V(0).Infof("unsupported %s %s", r.Method, r.RequestURI)
- WriteErrorResponse(w, ErrMethodNotAllowed, r)
+ WriteErrorResponse(w, r, ErrMethodNotAllowed)
}
diff --git a/weed/s3api/s3err/s3-error.go b/weed/s3api/s3err/s3-error.go
index 224378ec5..b87764742 100644
--- a/weed/s3api/s3err/s3-error.go
+++ b/weed/s3api/s3err/s3-error.go
@@ -58,4 +58,5 @@ var s3ErrorResponseMap = map[string]string{
"InvalidDuration": "Duration provided in the request is invalid.",
"XAmzContentSHA256Mismatch": "The provided 'x-amz-content-sha256' header does not match what was computed.",
// Add new API errors here.
+ "NoSuchCORSConfiguration": "The CORS configuration does not exist",
}
diff --git a/weed/s3api/s3err/s3api_errors.go b/weed/s3api/s3err/s3api_errors.go
index a46bd0f04..57f269a2e 100644
--- a/weed/s3api/s3err/s3api_errors.go
+++ b/weed/s3api/s3err/s3api_errors.go
@@ -51,6 +51,9 @@ const (
ErrBucketAlreadyExists
ErrBucketAlreadyOwnedByYou
ErrNoSuchBucket
+ ErrNoSuchBucketPolicy
+ ErrNoSuchCORSConfiguration
+ ErrNoSuchLifecycleConfiguration
ErrNoSuchKey
ErrNoSuchUpload
ErrInvalidBucketName
@@ -58,8 +61,10 @@ const (
ErrInvalidMaxKeys
ErrInvalidMaxUploads
ErrInvalidMaxParts
+ ErrInvalidMaxDeleteObjects
ErrInvalidPartNumberMarker
ErrInvalidPart
+ ErrInvalidRange
ErrInternalError
ErrInvalidCopyDest
ErrInvalidCopySource
@@ -98,6 +103,10 @@ const (
ErrPreconditionFailed
ErrExistingObjectIsDirectory
+ ErrExistingObjectIsFile
+
+ ErrTooManyRequest
+ ErrRequestBytesExceed
)
// error code to APIError structure, these fields carry respective
@@ -153,6 +162,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Argument max-parts must be an integer between 0 and 2147483647",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrInvalidMaxDeleteObjects: {
+ Code: "InvalidArgument",
+ Description: "Argument objects can contain a list of up to 1000 keys",
+ HTTPStatusCode: http.StatusBadRequest,
+ },
ErrInvalidPartNumberMarker: {
Code: "InvalidArgument",
Description: "Argument partNumberMarker must be an integer.",
@@ -163,6 +177,21 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "The specified bucket does not exist",
HTTPStatusCode: http.StatusNotFound,
},
+ ErrNoSuchBucketPolicy: {
+ Code: "NoSuchBucketPolicy",
+ Description: "The bucket policy does not exist",
+ HTTPStatusCode: http.StatusNotFound,
+ },
+ ErrNoSuchCORSConfiguration: {
+ Code: "NoSuchCORSConfiguration",
+ Description: "The CORS configuration does not exist",
+ HTTPStatusCode: http.StatusNotFound,
+ },
+ ErrNoSuchLifecycleConfiguration: {
+ Code: "NoSuchLifecycleConfiguration",
+ Description: "The lifecycle configuration does not exist",
+ HTTPStatusCode: http.StatusNotFound,
+ },
ErrNoSuchKey: {
Code: "NoSuchKey",
Description: "The specified key does not exist.",
@@ -196,7 +225,7 @@ var errorCodeResponse = map[ErrorCode]APIError{
HTTPStatusCode: http.StatusBadRequest,
},
ErrInvalidTag: {
- Code: "InvalidArgument",
+ Code: "InvalidTag",
Description: "The Tag value you have provided is invalid",
HTTPStatusCode: http.StatusBadRequest,
},
@@ -345,6 +374,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Invalid Request",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrInvalidRange: {
+ Code: "InvalidRange",
+ Description: "The requested range is not satisfiable",
+ HTTPStatusCode: http.StatusRequestedRangeNotSatisfiable,
+ },
ErrAuthNotSetup: {
Code: "InvalidRequest",
Description: "Signed request requires setting up SeaweedFS S3 authentication",
@@ -365,6 +399,21 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "Existing Object is a directory.",
HTTPStatusCode: http.StatusConflict,
},
+ ErrExistingObjectIsFile: {
+ Code: "ExistingObjectIsFile",
+ Description: "Existing Object is a file.",
+ HTTPStatusCode: http.StatusConflict,
+ },
+ ErrTooManyRequest: {
+ Code: "ErrTooManyRequest",
+ Description: "Too many simultaneous request count",
+ HTTPStatusCode: http.StatusTooManyRequests,
+ },
+ ErrRequestBytesExceed: {
+ Code: "ErrRequestBytesExceed",
+ Description: "Simultaneous request bytes exceed limitations",
+ HTTPStatusCode: http.StatusTooManyRequests,
+ },
}
// GetAPIError provides API Error for input API error code.
diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go
index b667b32a0..003807a25 100644
--- a/weed/s3api/stats.go
+++ b/weed/s3api/stats.go
@@ -1,8 +1,8 @@
package s3api
import (
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strconv"
"time"
@@ -28,11 +28,12 @@ func (r *StatusRecorder) Flush() {
func track(f http.HandlerFunc, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Server", "SeaweedFS S3 "+util.VERSION)
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ w.Header().Set("Server", "SeaweedFS S3")
recorder := NewStatusResponseWriter(w)
start := time.Now()
f(recorder, r)
- stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds())
- stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc()
+ stats_collect.S3RequestHistogram.WithLabelValues(action, bucket).Observe(time.Since(start).Seconds())
+ stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status), bucket).Inc()
}
}
diff --git a/weed/s3api/tags.go b/weed/s3api/tags.go
index 9ff7d1fba..979e5a80c 100644
--- a/weed/s3api/tags.go
+++ b/weed/s3api/tags.go
@@ -14,8 +14,9 @@ type TagSet struct {
}
type Tagging struct {
- XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Tagging"`
+ XMLName xml.Name `xml:"Tagging"`
TagSet TagSet `xml:"TagSet"`
+ Xmlns string `xml:"xmlns,attr"`
}
func (t *Tagging) ToTags() map[string]string {
@@ -27,7 +28,7 @@ func (t *Tagging) ToTags() map[string]string {
}
func FromTags(tags map[string]string) (t *Tagging) {
- t = &Tagging{}
+ t = &Tagging{Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/"}
for k, v := range tags {
t.TagSet.Tag = append(t.TagSet.Tag, Tag{
Key: k,
diff --git a/weed/s3api/tags_test.go b/weed/s3api/tags_test.go
index 52adb36c1..d8beb1922 100644
--- a/weed/s3api/tags_test.go
+++ b/weed/s3api/tags_test.go
@@ -32,6 +32,7 @@ func TestXMLUnmarshall(t *testing.T) {
func TestXMLMarshall(t *testing.T) {
tags := &Tagging{
+ Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/",
TagSet: TagSet{
[]Tag{
{