aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2019-07-16 11:13:23 +0800
committerGitHub <noreply@github.com>2019-07-16 11:13:23 +0800
commitd19bbee98d89ec6cd603572bd9c5d55749610e61 (patch)
tree8d760dcee4dfcb4404af90b7d5e64def4549b4cc /weed/s3api
parent01060c992591f412b0d5e180bde29991747a9462 (diff)
parent5b5e443d5b9985fd77f3d5470f1d5885a88bf2b9 (diff)
downloadseaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.tar.xz
seaweedfs-d19bbee98d89ec6cd603572bd9c5d55749610e61.zip
keep update from original (#1)
keep update from original
Diffstat (limited to 'weed/s3api')
-rw-r--r--weed/s3api/custom_types.go3
-rw-r--r--weed/s3api/filer_multipart.go51
-rw-r--r--weed/s3api/filer_multipart_test.go26
-rw-r--r--weed/s3api/filer_util.go39
-rw-r--r--weed/s3api/s3api_bucket_handlers.go21
-rw-r--r--weed/s3api/s3api_bucket_handlers_test.go39
-rw-r--r--weed/s3api/s3api_handlers.go16
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go27
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go64
-rw-r--r--weed/s3api/s3api_objects_list_handlers_test.go38
-rw-r--r--weed/s3api/s3api_server.go2
-rw-r--r--weed/s3api/s3api_xsd_generated.go55
12 files changed, 256 insertions, 125 deletions
diff --git a/weed/s3api/custom_types.go b/weed/s3api/custom_types.go
new file mode 100644
index 000000000..569dfc3ac
--- /dev/null
+++ b/weed/s3api/custom_types.go
@@ -0,0 +1,3 @@
+package s3api
+
+const s3TimeFormat = "2006-01-02T15:04:05.999Z07:00"
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 73be496d9..4de1dda36 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -1,6 +1,8 @@
package s3api
import (
+ "context"
+ "encoding/xml"
"fmt"
"path/filepath"
"strconv"
@@ -16,14 +18,15 @@ import (
)
type InitiateMultipartUploadResult struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput
}
-func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code ErrorCode) {
+func (s3a *S3ApiServer) createMultipartUpload(ctx context.Context, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code ErrorCode) {
uploadId, _ := uuid.NewV4()
uploadIdString := uploadId.String()
- if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
+ if err := s3a.mkdir(ctx, s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
@@ -34,9 +37,9 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
}
output = &InitiateMultipartUploadResult{
- s3.CreateMultipartUploadOutput{
+ CreateMultipartUploadOutput: s3.CreateMultipartUploadOutput{
Bucket: input.Bucket,
- Key: input.Key,
+ Key: objectKey(input.Key),
UploadId: aws.String(uploadIdString),
},
}
@@ -45,14 +48,15 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
}
type CompleteMultipartUploadResult struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
s3.CompleteMultipartUploadOutput
}
-func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code ErrorCode) {
+func (s3a *S3ApiServer) completeMultipartUpload(ctx context.Context, input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code ErrorCode) {
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
- entries, err := s3a.list(uploadDirectory, "", "", false, 0)
+ entries, err := s3a.list(ctx, uploadDirectory, "", "", false, 0)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, ErrNoSuchUpload
@@ -65,7 +69,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
for _, chunk := range entry.Chunks {
p := &filer_pb.FileChunk{
- FileId: chunk.FileId,
+ FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
Mtime: chunk.Mtime,
@@ -87,7 +91,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
}
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
- err = s3a.mkFile(dirName, entryName, finalParts)
+ err = s3a.mkFile(ctx, dirName, entryName, finalParts)
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
@@ -95,29 +99,30 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
}
output = &CompleteMultipartUploadResult{
- s3.CompleteMultipartUploadOutput{
+ CompleteMultipartUploadOutput: s3.CompleteMultipartUploadOutput{
+ Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer, dirName, entryName)),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""),
- Key: input.Key,
+ Key: objectKey(input.Key),
},
}
- if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, false, true); err != nil {
+ if err = s3a.rm(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
return
}
-func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code ErrorCode) {
+func (s3a *S3ApiServer) abortMultipartUpload(ctx context.Context, input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code ErrorCode) {
- exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true)
+ exists, err := s3a.exists(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true)
if err != nil {
glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, ErrNoSuchUpload
}
if exists {
- err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true, true)
+ err = s3a.rm(ctx, s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true, true)
}
if err != nil {
glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err)
@@ -128,13 +133,14 @@ func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput
}
type ListMultipartUploadsResult struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult"`
s3.ListMultipartUploadsOutput
}
-func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code ErrorCode) {
+func (s3a *S3ApiServer) listMultipartUploads(ctx context.Context, input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code ErrorCode) {
output = &ListMultipartUploadsResult{
- s3.ListMultipartUploadsOutput{
+ ListMultipartUploadsOutput: s3.ListMultipartUploadsOutput{
Bucket: input.Bucket,
Delimiter: input.Delimiter,
EncodingType: input.EncodingType,
@@ -144,7 +150,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
},
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
+ entries, err := s3a.list(ctx, s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
@@ -154,7 +160,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
if entry.Extended != nil {
key := entry.Extended["key"]
output.Uploads = append(output.Uploads, &s3.MultipartUpload{
- Key: aws.String(string(key)),
+ Key: objectKey(aws.String(string(key))),
UploadId: aws.String(entry.Name),
})
}
@@ -164,21 +170,22 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
}
type ListPartsResult struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"`
s3.ListPartsOutput
}
-func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) {
+func (s3a *S3ApiServer) listObjectParts(ctx context.Context, input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) {
output = &ListPartsResult{
- s3.ListPartsOutput{
+ ListPartsOutput: s3.ListPartsOutput{
Bucket: input.Bucket,
- Key: input.Key,
+ Key: objectKey(input.Key),
UploadId: input.UploadId,
MaxParts: input.MaxParts, // the maximum number of parts to return.
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
},
}
- entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId,
+ entries, err := s3a.list(ctx, s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId,
"", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, int(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go
new file mode 100644
index 000000000..835665dd6
--- /dev/null
+++ b/weed/s3api/filer_multipart_test.go
@@ -0,0 +1,26 @@
+package s3api
+
+import (
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "testing"
+)
+
+func TestInitiateMultipartUploadResult(t *testing.T) {
+
+ expected := `<?xml version="1.0" encoding="UTF-8"?>
+<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Bucket>example-bucket</Bucket><Key>example-object</Key><UploadId>VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA</UploadId></InitiateMultipartUploadResult>`
+ response := &InitiateMultipartUploadResult{
+ CreateMultipartUploadOutput: s3.CreateMultipartUploadOutput{
+ Bucket: aws.String("example-bucket"),
+ Key: aws.String("example-object"),
+ UploadId: aws.String("VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA"),
+ },
+ }
+
+ encoded := string(encodeResponse(response))
+ if encoded != expected {
+ t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
+ }
+
+}
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 40c5a3e26..84e3050cd 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -4,14 +4,15 @@ import (
"context"
"fmt"
"os"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
-func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
- return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+func (s3a *S3ApiServer) mkdir(ctx context.Context, parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
+ return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
entry := &filer_pb.Entry{
Name: dirName,
@@ -35,7 +36,7 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if _, err := client.CreateEntry(ctx, request); err != nil {
return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
}
@@ -43,8 +44,8 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun
})
}
-func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
- return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+func (s3a *S3ApiServer) mkFile(ctx context.Context, parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
+ return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
entry := &filer_pb.Entry{
Name: fileName,
@@ -65,7 +66,7 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun
}
glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
- if _, err := client.CreateEntry(context.Background(), request); err != nil {
+ if _, err := client.CreateEntry(ctx, request); err != nil {
return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
}
@@ -73,9 +74,9 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun
})
}
-func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) {
+func (s3a *S3ApiServer) list(ctx context.Context, parentDirectoryPath, prefix, startFrom string, inclusive bool, limit int) (entries []*filer_pb.Entry, err error) {
- err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
Directory: parentDirectoryPath,
@@ -86,7 +87,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
}
glog.V(4).Infof("read directory: %v", request)
- resp, err := client.ListEntries(context.Background(), request)
+ resp, err := client.ListEntries(ctx, request)
if err != nil {
return fmt.Errorf("list dir %v: %v", parentDirectoryPath, err)
}
@@ -100,11 +101,9 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
}
-func (s3a *S3ApiServer) rm(parentDirectoryPath string, entryName string, isDirectory, isDeleteData, isRecursive bool) error {
+func (s3a *S3ApiServer) rm(ctx context.Context, parentDirectoryPath string, entryName string, isDirectory, isDeleteData, isRecursive bool) error {
- return s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- ctx := context.Background()
+ return s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: parentDirectoryPath,
@@ -123,11 +122,9 @@ func (s3a *S3ApiServer) rm(parentDirectoryPath string, entryName string, isDirec
}
-func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
-
- err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+func (s3a *S3ApiServer) exists(ctx context.Context, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
- ctx := context.Background()
+ err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDirectoryPath,
@@ -147,3 +144,11 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD
return
}
+
+func objectKey(key *string) *string {
+ if strings.HasPrefix(*key, "/") {
+ t := (*key)[1:]
+ return &t
+ }
+ return key
+} \ No newline at end of file
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 1d319e354..492d94616 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -2,6 +2,7 @@ package s3api
import (
"context"
+ "encoding/xml"
"fmt"
"math"
"net/http"
@@ -21,15 +22,16 @@ var (
)
type ListAllMyBucketsResult struct {
- Buckets []*s3.Bucket `xml:"Buckets>Bucket"`
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult"`
Owner *s3.Owner
+ Buckets []*s3.Bucket `xml:"Buckets>Bucket"`
}
func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
var response ListAllMyBucketsResult
- entries, err := s3a.list(s3a.option.BucketsPath, "", "", false, math.MaxInt32)
+ entries, err := s3a.list(context.Background(), s3a.option.BucketsPath, "", "", false, math.MaxInt32)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@@ -63,7 +65,7 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
bucket := vars["bucket"]
// create the folder for bucket, but lazily create actual collection
- if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil {
+ if err := s3a.mkdir(context.Background(), s3a.option.BucketsPath, bucket, nil); err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
return
}
@@ -76,9 +78,8 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
vars := mux.Vars(r)
bucket := vars["bucket"]
- err := s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- ctx := context.Background()
+ ctx := context.Background()
+ err := s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
// delete collection
deleteCollectionRequest := &filer_pb.DeleteCollectionRequest{
@@ -93,7 +94,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
return nil
})
- err = s3a.rm(s3a.option.BucketsPath, bucket, true, false, true)
+ err = s3a.rm(ctx, s3a.option.BucketsPath, bucket, true, false, true)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@@ -108,7 +109,9 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
vars := mux.Vars(r)
bucket := vars["bucket"]
- err := s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx := context.Background()
+
+ err := s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: s3a.option.BucketsPath,
@@ -116,7 +119,7 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
}
glog.V(1).Infof("lookup bucket: %v", request)
- if _, err := client.LookupDirectoryEntry(context.Background(), request); err != nil {
+ if _, err := client.LookupDirectoryEntry(ctx, request); err != nil {
return fmt.Errorf("lookup bucket %s/%s: %v", s3a.option.BucketsPath, bucket, err)
}
diff --git a/weed/s3api/s3api_bucket_handlers_test.go b/weed/s3api/s3api_bucket_handlers_test.go
new file mode 100644
index 000000000..7ab04830b
--- /dev/null
+++ b/weed/s3api/s3api_bucket_handlers_test.go
@@ -0,0 +1,39 @@
+package s3api
+
+import (
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3"
+)
+
+func TestListBucketsHandler(t *testing.T) {
+
+ expected := `<?xml version="1.0" encoding="UTF-8"?>
+<ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Owner><DisplayName></DisplayName><ID></ID></Owner><Buckets><Bucket><CreationDate>2011-04-09T12:34:49Z</CreationDate><Name>test1</Name></Bucket><Bucket><CreationDate>2011-02-09T12:34:49Z</CreationDate><Name>test2</Name></Bucket></Buckets></ListAllMyBucketsResult>`
+ var response ListAllMyBucketsResult
+
+ var buckets []*s3.Bucket
+ buckets = append(buckets, &s3.Bucket{
+ Name: aws.String("test1"),
+ CreationDate: aws.Time(time.Date(2011, 4, 9, 12, 34, 49, 0, time.UTC)),
+ })
+ buckets = append(buckets, &s3.Bucket{
+ Name: aws.String("test2"),
+ CreationDate: aws.Time(time.Date(2011, 2, 9, 12, 34, 49, 0, time.UTC)),
+ })
+
+ response = ListAllMyBucketsResult{
+ Owner: &s3.Owner{
+ ID: aws.String(""),
+ DisplayName: aws.String(""),
+ },
+ Buckets: buckets,
+ }
+
+ encoded := string(encodeResponse(response))
+ if encoded != expected {
+ t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
+ }
+}
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 286398310..127be07e3 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -2,12 +2,14 @@ package s3api
import (
"bytes"
+ "context"
"encoding/base64"
"encoding/xml"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
"net/http"
"net/url"
"time"
@@ -35,17 +37,13 @@ func encodeResponse(response interface{}) []byte {
return bytesBuffer.Bytes()
}
-func (s3a *S3ApiServer) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (s3a *S3ApiServer) withFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
- grpcConnection, err := util.GrpcDial(s3a.option.FilerGrpcAddress)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", s3a.option.FilerGrpcAddress, err)
- }
- defer grpcConnection.Close()
-
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption)
- return fn(client)
}
// If none of the http routes match respond with MethodNotAllowed
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index 267d126c5..72a25e4a5 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -1,6 +1,7 @@
package s3api
import (
+ "context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
@@ -25,9 +26,9 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
bucket = vars["bucket"]
object = vars["object"]
- response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
+ response, errCode := s3a.createMultipartUpload(context.Background(), &s3.CreateMultipartUploadInput{
Bucket: aws.String(bucket),
- Key: aws.String(object),
+ Key: objectKey(aws.String(object)),
})
if errCode != ErrNone {
@@ -50,9 +51,9 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
- response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
+ response, errCode := s3a.completeMultipartUpload(context.Background(), &s3.CompleteMultipartUploadInput{
Bucket: aws.String(bucket),
- Key: aws.String(object),
+ Key: objectKey(aws.String(object)),
UploadId: aws.String(uploadID),
})
@@ -76,9 +77,9 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
- response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
+ response, errCode := s3a.abortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(bucket),
- Key: aws.String(object),
+ Key: objectKey(aws.String(object)),
UploadId: aws.String(uploadID),
})
@@ -111,7 +112,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
}
}
- response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
+ response, errCode := s3a.listMultipartUploads(context.Background(), &s3.ListMultipartUploadsInput{
Bucket: aws.String(bucket),
Delimiter: aws.String(delimiter),
EncodingType: aws.String(encodingType),
@@ -148,9 +149,9 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
return
}
- response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
+ response, errCode := s3a.listObjectParts(context.Background(), &s3.ListPartsInput{
Bucket: aws.String(bucket),
- Key: aws.String(object),
+ Key: objectKey(aws.String(object)),
MaxParts: aws.Int64(int64(maxParts)),
PartNumberMarker: aws.Int64(int64(partNumberMarker)),
UploadId: aws.String(uploadID),
@@ -174,8 +175,10 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
rAuthType := getRequestAuthType(r)
+ ctx := context.Background()
+
uploadID := r.URL.Query().Get("uploadId")
- exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
+ exists, err := s3a.exists(ctx, s3a.genUploadsFolder(bucket), uploadID, true)
if !exists {
writeErrorResponse(w, ErrNoSuchUpload, r.URL)
return
@@ -197,8 +200,8 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
dataReader = newSignV4ChunkedReader(r)
}
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
- s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID-1)
+ uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
+ s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID-1, bucket)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index d751a3b1d..4053913fb 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -7,10 +7,9 @@ import (
"net/url"
"path/filepath"
"strconv"
+ "strings"
"time"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/s3"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -46,7 +45,9 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
marker = startAfter
}
- response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker)
+ ctx := context.Background()
+
+ response, err := s3a.listFilerEntries(ctx, bucket, originalPrefix, maxKeys, marker)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@@ -64,6 +65,8 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
vars := mux.Vars(r)
bucket := vars["bucket"]
+ ctx := context.Background()
+
originalPrefix, marker, delimiter, maxKeys := getListObjectsV1Args(r.URL.Query())
if maxKeys < 0 {
@@ -75,7 +78,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
return
}
- response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker)
+ response, err := s3a.listFilerEntries(ctx, bucket, originalPrefix, maxKeys, marker)
if err != nil {
writeErrorResponse(w, ErrInternalError, r.URL)
@@ -85,13 +88,16 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML(w, encodeResponse(response))
}
-func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys int, marker string) (response *s3.ListObjectsOutput, err error) {
+func (s3a *S3ApiServer) listFilerEntries(ctx context.Context, bucket, originalPrefix string, maxKeys int, marker string) (response ListBucketResult, err error) {
// convert full path prefix into directory name and prefix for entry name
dir, prefix := filepath.Split(originalPrefix)
+ if strings.HasPrefix(dir, "/") {
+ dir = dir[1:]
+ }
// check filer
- err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = s3a.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
Directory: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, dir),
@@ -101,13 +107,13 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys
InclusiveStartFrom: false,
}
- resp, err := client.ListEntries(context.Background(), request)
+ resp, err := client.ListEntries(ctx, request)
if err != nil {
return fmt.Errorf("list buckets: %v", err)
}
- var contents []*s3.Object
- var commonPrefixes []*s3.CommonPrefix
+ var contents []ListEntry
+ var commonPrefixes []PrefixEntry
var counter int
var lastEntryName string
var isTruncated bool
@@ -119,37 +125,37 @@ func (s3a *S3ApiServer) listFilerEntries(bucket, originalPrefix string, maxKeys
}
lastEntryName = entry.Name
if entry.IsDirectory {
- commonPrefixes = append(commonPrefixes, &s3.CommonPrefix{
- Prefix: aws.String(fmt.Sprintf("%s%s/", dir, entry.Name)),
+ commonPrefixes = append(commonPrefixes, PrefixEntry{
+ Prefix: fmt.Sprintf("%s%s/", dir, entry.Name),
})
} else {
- contents = append(contents, &s3.Object{
- Key: aws.String(fmt.Sprintf("%s%s", dir, entry.Name)),
- LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0)),
- ETag: aws.String("\"" + filer2.ETag(entry.Chunks) + "\""),
- Size: aws.Int64(int64(filer2.TotalSize(entry.Chunks))),
- Owner: &s3.Owner{
- ID: aws.String("bcaf161ca5fb16fd081034f"),
- DisplayName: aws.String("webfile"),
+ contents = append(contents, ListEntry{
+ Key: fmt.Sprintf("%s%s", dir, entry.Name),
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ ETag: "\"" + filer2.ETag(entry.Chunks) + "\"",
+ Size: int64(filer2.TotalSize(entry.Chunks)),
+ Owner: CanonicalUser{
+ ID: fmt.Sprintf("%x", entry.Attributes.Uid),
+ DisplayName: entry.Attributes.UserName,
},
- StorageClass: aws.String("STANDARD"),
+ StorageClass: "STANDARD",
})
}
}
- response = &s3.ListObjectsOutput{
- Name: aws.String(bucket),
- Prefix: aws.String(originalPrefix),
- Marker: aws.String(marker),
- NextMarker: aws.String(lastEntryName),
- MaxKeys: aws.Int64(int64(maxKeys)),
- Delimiter: aws.String("/"),
- IsTruncated: aws.Bool(isTruncated),
+ response = ListBucketResult{
+ Name: bucket,
+ Prefix: originalPrefix,
+ Marker: marker,
+ NextMarker: lastEntryName,
+ MaxKeys: maxKeys,
+ Delimiter: "/",
+ IsTruncated: isTruncated,
Contents: contents,
CommonPrefixes: commonPrefixes,
}
- glog.V(4).Infof("read directory: %v, found: %v", request, counter)
+ glog.V(4).Infof("read directory: %v, found: %v, %+v", request, counter, response)
return nil
})
diff --git a/weed/s3api/s3api_objects_list_handlers_test.go b/weed/s3api/s3api_objects_list_handlers_test.go
new file mode 100644
index 000000000..7b87b32fb
--- /dev/null
+++ b/weed/s3api/s3api_objects_list_handlers_test.go
@@ -0,0 +1,38 @@
+package s3api
+
+import (
+ "testing"
+ "time"
+)
+
+func TestListObjectsHandler(t *testing.T) {
+
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
+
+ expected := `<?xml version="1.0" encoding="UTF-8"?>
+<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Name>test_container</Name><Prefix></Prefix><Marker></Marker><MaxKeys>1000</MaxKeys><IsTruncated>false</IsTruncated><Contents><Key>1.zip</Key><ETag>&#34;4397da7a7649e8085de9916c240e8166&#34;</ETag><Size>1234567</Size><Owner><ID>65a011niqo39cdf8ec533ec3d1ccaafsa932</ID></Owner><StorageClass>STANDARD</StorageClass><LastModified>2011-04-09T12:34:49Z</LastModified></Contents></ListBucketResult>`
+
+ response := ListBucketResult{
+ Name: "test_container",
+ Prefix: "",
+ Marker: "",
+ NextMarker: "",
+ MaxKeys: 1000,
+ IsTruncated: false,
+ Contents: []ListEntry{{
+ Key: "1.zip",
+ LastModified: time.Date(2011, 4, 9, 12, 34, 49, 0, time.UTC),
+ ETag: "\"4397da7a7649e8085de9916c240e8166\"",
+ Size: 1234567,
+ Owner: CanonicalUser{
+ ID: "65a011niqo39cdf8ec533ec3d1ccaafsa932",
+ },
+ StorageClass: "STANDARD",
+ }},
+ }
+
+ encoded := string(encodeResponse(response))
+ if encoded != expected {
+ t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
+ }
+}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index db798a546..24458592d 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -8,6 +8,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/gorilla/mux"
+ "google.golang.org/grpc"
"net/http"
)
@@ -16,6 +17,7 @@ type S3ApiServerOption struct {
FilerGrpcAddress string
DomainName string
BucketsPath string
+ GrpcDialOption grpc.DialOption
}
type S3ApiServer struct {
diff --git a/weed/s3api/s3api_xsd_generated.go b/weed/s3api/s3api_xsd_generated.go
index df07f3fea..573c09ede 100644
--- a/weed/s3api/s3api_xsd_generated.go
+++ b/weed/s3api/s3api_xsd_generated.go
@@ -25,8 +25,8 @@ type BucketLoggingStatus struct {
}
type CanonicalUser struct {
- ID string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ID"`
- DisplayName string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DisplayName,omitempty"`
+ ID string `xml:"ID"`
+ DisplayName string `xml:"DisplayName,omitempty"`
}
type CopyObject struct {
@@ -506,15 +506,15 @@ func (t *ListAllMyBuckets) UnmarshalXML(d *xml.Decoder, start xml.StartElement)
}
type ListAllMyBucketsEntry struct {
- Name string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Name"`
- CreationDate time.Time `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CreationDate"`
+ Name string `xml:"Name"`
+ CreationDate time.Time `xml:"CreationDate"`
}
func (t *ListAllMyBucketsEntry) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type T ListAllMyBucketsEntry
var layout struct {
*T
- CreationDate *xsdDateTime `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CreationDate"`
+ CreationDate *xsdDateTime `xml:"CreationDate"`
}
layout.T = (*T)(t)
layout.CreationDate = (*xsdDateTime)(&layout.T.CreationDate)
@@ -524,7 +524,7 @@ func (t *ListAllMyBucketsEntry) UnmarshalXML(d *xml.Decoder, start xml.StartElem
type T ListAllMyBucketsEntry
var overlay struct {
*T
- CreationDate *xsdDateTime `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CreationDate"`
+ CreationDate *xsdDateTime `xml:"CreationDate"`
}
overlay.T = (*T)(t)
overlay.CreationDate = (*xsdDateTime)(&overlay.T.CreationDate)
@@ -532,7 +532,7 @@ func (t *ListAllMyBucketsEntry) UnmarshalXML(d *xml.Decoder, start xml.StartElem
}
type ListAllMyBucketsList struct {
- Bucket []ListAllMyBucketsEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Bucket,omitempty"`
+ Bucket []ListAllMyBucketsEntry `xml:"Bucket,omitempty"`
}
type ListAllMyBucketsResponse struct {
@@ -577,32 +577,33 @@ type ListBucketResponse struct {
}
type ListBucketResult struct {
- Metadata []MetadataEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Metadata,omitempty"`
- Name string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Name"`
- Prefix string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Prefix"`
- Marker string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Marker"`
- NextMarker string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ NextMarker,omitempty"`
- MaxKeys int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ MaxKeys"`
- Delimiter string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Delimiter,omitempty"`
- IsTruncated bool `xml:"http://s3.amazonaws.com/doc/2006-03-01/ IsTruncated"`
- Contents []ListEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Contents,omitempty"`
- CommonPrefixes []PrefixEntry `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CommonPrefixes,omitempty"`
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
+ Metadata []MetadataEntry `xml:"Metadata,omitempty"`
+ Name string `xml:"Name"`
+ Prefix string `xml:"Prefix"`
+ Marker string `xml:"Marker"`
+ NextMarker string `xml:"NextMarker,omitempty"`
+ MaxKeys int `xml:"MaxKeys"`
+ Delimiter string `xml:"Delimiter,omitempty"`
+ IsTruncated bool `xml:"IsTruncated"`
+ Contents []ListEntry `xml:"Contents,omitempty"`
+ CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
}
type ListEntry struct {
- Key string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Key"`
- LastModified time.Time `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LastModified"`
- ETag string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ETag"`
- Size int64 `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Size"`
- Owner CanonicalUser `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Owner,omitempty"`
- StorageClass StorageClass `xml:"http://s3.amazonaws.com/doc/2006-03-01/ StorageClass"`
+ Key string `xml:"Key"`
+ LastModified time.Time `xml:"LastModified"`
+ ETag string `xml:"ETag"`
+ Size int64 `xml:"Size"`
+ Owner CanonicalUser `xml:"Owner,omitempty"`
+ StorageClass StorageClass `xml:"StorageClass"`
}
func (t *ListEntry) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
type T ListEntry
var layout struct {
*T
- LastModified *xsdDateTime `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LastModified"`
+ LastModified *xsdDateTime `xml:"LastModified"`
}
layout.T = (*T)(t)
layout.LastModified = (*xsdDateTime)(&layout.T.LastModified)
@@ -612,7 +613,7 @@ func (t *ListEntry) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
type T ListEntry
var overlay struct {
*T
- LastModified *xsdDateTime `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LastModified"`
+ LastModified *xsdDateTime `xml:"LastModified"`
}
overlay.T = (*T)(t)
overlay.LastModified = (*xsdDateTime)(&overlay.T.LastModified)
@@ -965,10 +966,10 @@ func (b xsdBase64Binary) MarshalText() ([]byte, error) {
type xsdDateTime time.Time
func (t *xsdDateTime) UnmarshalText(text []byte) error {
- return _unmarshalTime(text, (*time.Time)(t), "2006-01-02T15:04:05.999999999")
+ return _unmarshalTime(text, (*time.Time)(t), s3TimeFormat)
}
func (t xsdDateTime) MarshalText() ([]byte, error) {
- return []byte((time.Time)(t).Format("2006-01-02T15:04:05.999999999")), nil
+ return []byte((time.Time)(t).Format(s3TimeFormat)), nil
}
func (t xsdDateTime) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
if (time.Time)(t).IsZero() {