aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-12 00:46:12 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-12 00:46:12 -0700
commitf6d8525d1d8c1faf9a0826472bfcef0592ba3a93 (patch)
tree8856e342215ca37a943c97e09528ead83814e472 /weed
parent98d9aadd37500e31b98e91cbf1b4d4050d90e762 (diff)
downloadseaweedfs-f6d8525d1d8c1faf9a0826472bfcef0592ba3a93.tar.xz
seaweedfs-f6d8525d1d8c1faf9a0826472bfcef0592ba3a93.zip
working S3 multipart uploads
Diffstat (limited to 'weed')
-rw-r--r--weed/s3api/filer_multipart.go82
-rw-r--r--weed/s3api/filer_util.go4
-rw-r--r--weed/s3api/s3api_errors.go13
-rw-r--r--weed/s3api/s3api_object_handlers.go54
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go44
-rw-r--r--weed/s3api/s3api_server.go10
-rw-r--r--weed/server/filer_server_handlers_write.go3
-rw-r--r--weed/server/volume_server_handlers_write.go11
8 files changed, 126 insertions, 95 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index dbb1a3098..a688b917a 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -15,7 +15,11 @@ import (
"github.com/satori/go.uuid"
)
-func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *s3.CreateMultipartUploadOutput, code ErrorCode) {
+type InitiateMultipartUploadResult struct {
+ s3.CreateMultipartUploadOutput
+}
+
+func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code ErrorCode) {
uploadId, _ := uuid.NewV4()
uploadIdString := uploadId.String()
@@ -29,16 +33,22 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
return nil, ErrInternalError
}
- output = &s3.CreateMultipartUploadOutput{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadId: aws.String(uploadIdString),
+ output = &InitiateMultipartUploadResult{
+ s3.CreateMultipartUploadOutput{
+ Bucket: input.Bucket,
+ Key: input.Key,
+ UploadId: aws.String(uploadIdString),
+ },
}
return
}
-func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *s3.CompleteMultipartUploadOutput, code ErrorCode) {
+type CompleteMultipartUploadResult struct {
+ s3.CompleteMultipartUploadOutput
+}
+
+func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code ErrorCode) {
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
@@ -54,13 +64,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
for _, entry := range entries {
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
for _, chunk := range entry.Chunks {
- finalParts = append(finalParts, &filer_pb.FileChunk{
+ p := &filer_pb.FileChunk{
FileId: chunk.FileId,
Offset: offset,
Size: chunk.Size,
Mtime: chunk.Mtime,
ETag: chunk.ETag,
- })
+ }
+ finalParts = append(finalParts, p)
offset += int64(chunk.Size)
}
}
@@ -71,6 +82,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
if dirName == "." {
dirName = ""
}
+ if strings.HasPrefix(dirName, "/") {
+ dirName = dirName[1:]
+ }
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
err = s3a.mkFile(dirName, entryName, finalParts)
@@ -80,10 +94,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, ErrInternalError
}
- output = &s3.CompleteMultipartUploadOutput{
- Bucket: input.Bucket,
- ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""),
- Key: input.Key,
+ output = &CompleteMultipartUploadResult{
+ s3.CompleteMultipartUploadOutput{
+ Bucket: input.Bucket,
+ ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""),
+ Key: input.Key,
+ },
}
return
@@ -107,15 +123,21 @@ func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput
return &s3.AbortMultipartUploadOutput{}, ErrNone
}
-func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *s3.ListMultipartUploadsOutput, code ErrorCode) {
+type ListMultipartUploadsResult struct {
+ s3.ListMultipartUploadsOutput
+}
+
+func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code ErrorCode) {
- output = &s3.ListMultipartUploadsOutput{
- Bucket: input.Bucket,
- Delimiter: input.Delimiter,
- EncodingType: input.EncodingType,
- KeyMarker: input.KeyMarker,
- MaxUploads: input.MaxUploads,
- Prefix: input.Prefix,
+ output = &ListMultipartUploadsResult{
+ s3.ListMultipartUploadsOutput{
+ Bucket: input.Bucket,
+ Delimiter: input.Delimiter,
+ EncodingType: input.EncodingType,
+ KeyMarker: input.KeyMarker,
+ MaxUploads: input.MaxUploads,
+ Prefix: input.Prefix,
+ },
}
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
@@ -136,13 +158,19 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
return
}
-func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *s3.ListPartsOutput, code ErrorCode) {
- output = &s3.ListPartsOutput{
- Bucket: input.Bucket,
- Key: input.Key,
- UploadId: input.UploadId,
- MaxParts: input.MaxParts, // the maximum number of parts to return.
- PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
+type ListPartsResult struct {
+ s3.ListPartsOutput
+}
+
+func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) {
+ output = &ListPartsResult{
+ s3.ListPartsOutput{
+ Bucket: input.Bucket,
+ Key: input.Key,
+ UploadId: input.UploadId,
+ MaxParts: input.MaxParts, // the maximum number of parts to return.
+ PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
+ },
}
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId,
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index f0949389c..6dd93d50f 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -78,7 +78,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.ListEntriesRequest{
- Directory: s3a.option.BucketsPath,
+ Directory: parentDirectoryPath,
Prefix: prefix,
StartFromFileName: startFrom,
InclusiveStartFrom: inclusive,
@@ -135,7 +135,7 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD
Name: entryName,
}
- glog.V(1).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
+ glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
resp, err := client.LookupDirectoryEntry(ctx, request)
if err != nil {
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
diff --git a/weed/s3api/s3api_errors.go b/weed/s3api/s3api_errors.go
index e9975dbb6..7ba55ed28 100644
--- a/weed/s3api/s3api_errors.go
+++ b/weed/s3api/s3api_errors.go
@@ -40,9 +40,7 @@ const (
ErrInvalidMaxParts
ErrInvalidPartNumberMarker
ErrInvalidPart
- ErrInvalidPartOrder
ErrInternalError
- ErrMalformedXML
ErrNotImplemented
)
@@ -114,21 +112,12 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "We encountered an internal error, please try again.",
HTTPStatusCode: http.StatusInternalServerError,
},
- ErrMalformedXML: {
- Code: "MalformedXML",
- Description: "The XML you provided was not well-formed or did not validate against our published schema.",
- HTTPStatusCode: http.StatusBadRequest,
- },
+
ErrInvalidPart: {
Code: "InvalidPart",
Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
HTTPStatusCode: http.StatusBadRequest,
},
- ErrInvalidPartOrder: {
- Code: "InvalidPartOrder",
- Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
- HTTPStatusCode: http.StatusBadRequest,
- },
ErrNotImplemented: {
Code: "NotImplemented",
Description: "A header you provided implies functionality that is not implemented",
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 83a9186a9..b0a81083a 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -3,12 +3,15 @@ package s3api
import (
"encoding/json"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/gorilla/mux"
"io"
"io/ioutil"
"net/http"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/gorilla/mux"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "crypto/md5"
)
var (
@@ -21,19 +24,13 @@ func init() {
}}
}
-type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
-}
-
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
vars := mux.Vars(r)
bucket := vars["bucket"]
- object := vars["object"]
+ object := getObject(vars)
_, err := validateContentMd5(r.Header)
if err != nil {
@@ -47,7 +44,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = newSignV4ChunkedReader(r)
}
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
+ uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
@@ -147,7 +144,10 @@ func passThroghResponse(proxyResonse *http.Response, w http.ResponseWriter) {
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
- proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader)
+ hash := md5.New()
+ var body io.Reader = io.TeeReader(dataReader, hash)
+
+ proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
if err != nil {
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
@@ -165,28 +165,30 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
resp, postErr := client.Do(proxyReq)
+ dataReader.Close()
+
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
return "", ErrInternalError
}
defer resp.Body.Close()
- etag = resp.Header.Get("ETag")
+ etag = fmt.Sprintf("%x", hash.Sum(nil))
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
glog.Errorf("upload to filer response read: %v", ra_err)
return etag, ErrInternalError
}
- var ret UploadResult
+ var ret weed_server.FilerPostResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
- return etag, ErrInternalError
+ return "", ErrInternalError
}
if ret.Error != "" {
glog.Errorf("upload to filer error: %v", ret.Error)
- return etag, ErrInternalError
+ return "", ErrInternalError
}
return etag, ErrNone
@@ -194,6 +196,26 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
func setEtag(w http.ResponseWriter, etag string) {
if etag != "" {
- w.Header().Set("ETag", "\""+etag+"\"")
+ if strings.HasPrefix(etag, "\"") {
+ w.Header().Set("ETag", etag)
+ } else {
+ w.Header().Set("ETag", "\""+etag+"\"")
+ }
+ }
+}
+
+func getObject(vars map[string]string) string {
+ object := vars["object"]
+ if !strings.HasPrefix(object, "/") {
+ object = "/" + object
+ }
+ return object
+}
+
+func getEtag(r *http.Request) (etag string) {
+ etag = r.Header.Get("ETag")
+ if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
+ etag = etag[1 : len(etag)-1]
}
+ return
}
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index d9baa9aae..140901917 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -1,15 +1,12 @@
package s3api
import (
- "encoding/xml"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/gorilla/mux"
- "io/ioutil"
"net/http"
"net/url"
- "sort"
"strconv"
"strings"
)
@@ -38,6 +35,8 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
return
}
+ // println("NewMultipartUploadHandler", string(encodeResponse(response)))
+
writeSuccessResponseXML(w, encodeResponse(response))
}
@@ -46,37 +45,19 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
- object := vars["object"]
+ object := getObject(vars)
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
- completeMultipartBytes, err := ioutil.ReadAll(r.Body)
- if err != nil {
- writeErrorResponse(w, ErrInternalError, r.URL)
- return
- }
- completedMultipartUpload := &s3.CompletedMultipartUpload{}
- if err = xml.Unmarshal(completeMultipartBytes, completedMultipartUpload); err != nil {
- writeErrorResponse(w, ErrMalformedXML, r.URL)
- return
- }
- if len(completedMultipartUpload.Parts) == 0 {
- writeErrorResponse(w, ErrMalformedXML, r.URL)
- return
- }
- if !sort.IsSorted(byCompletedPartNumber(completedMultipartUpload.Parts)) {
- writeErrorResponse(w, ErrInvalidPartOrder, r.URL)
- return
- }
-
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
- Bucket: aws.String(bucket),
- Key: aws.String(object),
- MultipartUpload: completedMultipartUpload,
- UploadId: aws.String(uploadID),
+ Bucket: aws.String(bucket),
+ Key: aws.String(object),
+ UploadId: aws.String(uploadID),
})
+ // println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
+
if errCode != ErrNone {
writeErrorResponse(w, errCode, r.URL)
return
@@ -90,7 +71,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
- object := vars["object"]
+ object := getObject(vars)
// Get upload id.
uploadID, _, _, _ := getObjectResources(r.URL.Query())
@@ -106,6 +87,8 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
return
}
+ // println("AbortMultipartUploadHandler", string(encodeResponse(response)))
+
writeSuccessResponseXML(w, encodeResponse(response))
}
@@ -144,6 +127,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
}
// TODO handle encodingType
+ // println("ListMultipartUploadsHandler", string(encodeResponse(response)))
writeSuccessResponseXML(w, encodeResponse(response))
}
@@ -152,7 +136,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
bucket := vars["bucket"]
- object := vars["object"]
+ object := getObject(vars)
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
if partNumberMarker < 0 {
@@ -177,6 +161,8 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
return
}
+ // println("ListObjectPartsHandler", string(encodeResponse(response)))
+
writeSuccessResponseXML(w, encodeResponse(response))
}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index a62b521de..6722a519c 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -43,11 +43,6 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
for _, bucket := range routers {
- // PutObject
- bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
- // PutBucket
- bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
-
// HeadObject
bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(s3a.HeadObjectHandler)
// HeadBucket
@@ -66,6 +61,11 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
// ListMultipartUploads
bucket.Methods("GET").HandlerFunc(s3a.ListMultipartUploadsHandler).Queries("uploads", "")
+ // PutObject
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
+ // PutBucket
+ bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
+
// DeleteObject
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(s3a.DeleteObjectHandler)
// DeleteBucket
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 2f9351fa1..394f32d88 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -117,7 +117,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation)
+ glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
u, _ := url.Parse(urlLocation)
@@ -221,6 +221,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
Fid: fileId,
Url: urlLocation,
}
+ setEtag(w, etag)
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index a0b142dea..64c59fa31 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -4,13 +4,14 @@ import (
"errors"
"fmt"
"net/http"
+ "strconv"
+ "time"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/topology"
- "strconv"
- "time"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -175,6 +176,10 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
func setEtag(w http.ResponseWriter, etag string) {
if etag != "" {
- w.Header().Set("ETag", "\""+etag+"\"")
+ if strings.HasPrefix(etag, "\"") {
+ w.Header().Set("ETag", etag)
+ } else {
+ w.Header().Set("ETag", "\""+etag+"\"")
+ }
}
}