aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-02 14:20:47 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-02 14:20:47 -0700
commit88f1d32cc4a09d399966d7fa1039c1a714758e43 (patch)
tree8777e248aa6b725f44a9183ea0495742951f8288
parentacd8836d27cec4b02bfe04339d1b555ab95f8d13 (diff)
downloadseaweedfs-88f1d32cc4a09d399966d7fa1039c1a714758e43.tar.xz
seaweedfs-88f1d32cc4a09d399966d7fa1039c1a714758e43.zip
add s3ChunkedReader
fix https://github.com/chrislusf/seaweedfs/issues/718
-rw-r--r--weed/s3api/chunked_reader_v4.go276
-rw-r--r--weed/s3api/s3api_auth.go90
-rw-r--r--weed/s3api/s3api_object_handlers.go8
3 files changed, 373 insertions, 1 deletions
diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go
new file mode 100644
index 000000000..35a97dffe
--- /dev/null
+++ b/weed/s3api/chunked_reader_v4.go
@@ -0,0 +1,276 @@
+package s3api
+
+// the related code is copied and modified from minio source code
+
+/*
+ * Minio Cloud Storage, (C) 2016 Minio, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "io"
+ "net/http"
+ "github.com/dustin/go-humanize"
+)
+
+// Streaming AWS Signature Version '4' constants.
+const (
+ streamingContentSHA256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
+)
+
+const maxLineLength = 4 * humanize.KiByte // assumed <= bufio.defaultBufSize 4KiB
+
+// lineTooLong is generated as chunk header is bigger than 4KiB.
+var errLineTooLong = errors.New("header line too long")
+
+// Malformed encoding is generated when chunk header is wrongly formed.
+var errMalformedEncoding = errors.New("malformed chunked encoding")
+
+// newSignV4ChunkedReader returns a new s3ChunkedReader that translates the data read from r
+// out of HTTP "chunked" format before returning it.
+// The s3ChunkedReader returns io.EOF when the final 0-length chunk is read.
+func newSignV4ChunkedReader(req *http.Request) (io.ReadCloser) {
+ return &s3ChunkedReader{
+ reader: bufio.NewReader(req.Body),
+ state: readChunkHeader,
+ }
+}
+
+// Represents the overall state that is required for decoding a
+// AWS Signature V4 chunked reader.
+type s3ChunkedReader struct {
+ reader *bufio.Reader
+ state chunkState
+ lastChunk bool
+ chunkSignature string
+ n uint64 // Unread bytes in chunk
+ err error
+}
+
+// Read chunk reads the chunk token signature portion.
+func (cr *s3ChunkedReader) readS3ChunkHeader() {
+ // Read the first chunk line until CRLF.
+ var hexChunkSize, hexChunkSignature []byte
+ hexChunkSize, hexChunkSignature, cr.err = readChunkLine(cr.reader)
+ if cr.err != nil {
+ return
+ }
+ // <hex>;token=value - converts the hex into its uint64 form.
+ cr.n, cr.err = parseHexUint(hexChunkSize)
+ if cr.err != nil {
+ return
+ }
+ if cr.n == 0 {
+ cr.err = io.EOF
+ }
+ // Save the incoming chunk signature.
+ cr.chunkSignature = string(hexChunkSignature)
+}
+
+type chunkState int
+
+const (
+ readChunkHeader chunkState = iota
+ readChunkTrailer
+ readChunk
+ verifyChunk
+ eofChunk
+)
+
+func (cs chunkState) String() string {
+ stateString := ""
+ switch cs {
+ case readChunkHeader:
+ stateString = "readChunkHeader"
+ case readChunkTrailer:
+ stateString = "readChunkTrailer"
+ case readChunk:
+ stateString = "readChunk"
+ case verifyChunk:
+ stateString = "verifyChunk"
+ case eofChunk:
+ stateString = "eofChunk"
+
+ }
+ return stateString
+}
+
+func (cr *s3ChunkedReader) Close() (err error) {
+ return nil
+}
+
+// Read - implements `io.Reader`, which transparently decodes
+// the incoming AWS Signature V4 streaming signature.
+func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
+ for {
+ switch cr.state {
+ case readChunkHeader:
+ cr.readS3ChunkHeader()
+ // If we're at the end of a chunk.
+ if cr.n == 0 && cr.err == io.EOF {
+ cr.state = readChunkTrailer
+ cr.lastChunk = true
+ continue
+ }
+ if cr.err != nil {
+ return 0, cr.err
+ }
+ cr.state = readChunk
+ case readChunkTrailer:
+ cr.err = readCRLF(cr.reader)
+ if cr.err != nil {
+ return 0, errMalformedEncoding
+ }
+ cr.state = verifyChunk
+ case readChunk:
+ // There is no more space left in the request buffer.
+ if len(buf) == 0 {
+ return n, nil
+ }
+ rbuf := buf
+ // The request buffer is larger than the current chunk size.
+ // Read only the current chunk from the underlying reader.
+ if uint64(len(rbuf)) > cr.n {
+ rbuf = rbuf[:cr.n]
+ }
+ var n0 int
+ n0, cr.err = cr.reader.Read(rbuf)
+ if cr.err != nil {
+ // We have lesser than chunk size advertised in chunkHeader, this is 'unexpected'.
+ if cr.err == io.EOF {
+ cr.err = io.ErrUnexpectedEOF
+ }
+ return 0, cr.err
+ }
+
+ // Update the bytes read into request buffer so far.
+ n += n0
+ buf = buf[n0:]
+ // Update bytes to be read of the current chunk before verifying chunk's signature.
+ cr.n -= uint64(n0)
+
+ // If we're at the end of a chunk.
+ if cr.n == 0 {
+ cr.state = readChunkTrailer
+ continue
+ }
+ case verifyChunk:
+ if cr.lastChunk {
+ cr.state = eofChunk
+ } else {
+ cr.state = readChunkHeader
+ }
+ case eofChunk:
+ return n, io.EOF
+ }
+ }
+}
+
+// readCRLF - check if reader only has '\r\n' CRLF character.
+// returns malformed encoding if it doesn't.
+func readCRLF(reader io.Reader) error {
+ buf := make([]byte, 2)
+ _, err := io.ReadFull(reader, buf[:2])
+ if err != nil {
+ return err
+ }
+ if buf[0] != '\r' || buf[1] != '\n' {
+ return errMalformedEncoding
+ }
+ return nil
+}
+
+// Read a line of bytes (up to \n) from b.
+// Give up if the line exceeds maxLineLength.
+// The returned bytes are owned by the bufio.Reader
+// so they are only valid until the next bufio read.
+func readChunkLine(b *bufio.Reader) ([]byte, []byte, error) {
+ buf, err := b.ReadSlice('\n')
+ if err != nil {
+ // We always know when EOF is coming.
+ // If the caller asked for a line, there should be a line.
+ if err == io.EOF {
+ err = io.ErrUnexpectedEOF
+ } else if err == bufio.ErrBufferFull {
+ err = errLineTooLong
+ }
+ return nil, nil, err
+ }
+ if len(buf) >= maxLineLength {
+ return nil, nil, errLineTooLong
+ }
+ // Parse s3 specific chunk extension and fetch the values.
+ hexChunkSize, hexChunkSignature := parseS3ChunkExtension(buf)
+ return hexChunkSize, hexChunkSignature, nil
+}
+
+// trimTrailingWhitespace - trim trailing white space.
+func trimTrailingWhitespace(b []byte) []byte {
+ for len(b) > 0 && isASCIISpace(b[len(b)-1]) {
+ b = b[:len(b)-1]
+ }
+ return b
+}
+
+// isASCIISpace - is ascii space?
+func isASCIISpace(b byte) bool {
+ return b == ' ' || b == '\t' || b == '\n' || b == '\r'
+}
+
+// Constant s3 chunk encoding signature.
+const s3ChunkSignatureStr = ";chunk-signature="
+
+// parses3ChunkExtension removes any s3 specific chunk-extension from buf.
+// For example,
+// "10000;chunk-signature=..." => "10000", "chunk-signature=..."
+func parseS3ChunkExtension(buf []byte) ([]byte, []byte) {
+ buf = trimTrailingWhitespace(buf)
+ semi := bytes.Index(buf, []byte(s3ChunkSignatureStr))
+ // Chunk signature not found, return the whole buffer.
+ if semi == -1 {
+ return buf, nil
+ }
+ return buf[:semi], parseChunkSignature(buf[semi:])
+}
+
+// parseChunkSignature - parse chunk signature.
+func parseChunkSignature(chunk []byte) []byte {
+ chunkSplits := bytes.SplitN(chunk, []byte(s3ChunkSignatureStr), 2)
+ return chunkSplits[1]
+}
+
+// parse hex to uint64.
+func parseHexUint(v []byte) (n uint64, err error) {
+ for i, b := range v {
+ switch {
+ case '0' <= b && b <= '9':
+ b = b - '0'
+ case 'a' <= b && b <= 'f':
+ b = b - 'a' + 10
+ case 'A' <= b && b <= 'F':
+ b = b - 'A' + 10
+ default:
+ return 0, errors.New("invalid byte in chunk length")
+ }
+ if i == 16 {
+ return 0, errors.New("http chunk length too large")
+ }
+ n <<= 4
+ n |= uint64(b)
+ }
+ return
+}
diff --git a/weed/s3api/s3api_auth.go b/weed/s3api/s3api_auth.go
new file mode 100644
index 000000000..f429ad28d
--- /dev/null
+++ b/weed/s3api/s3api_auth.go
@@ -0,0 +1,90 @@
+package s3api
+
+import (
+ "net/http"
+ "strings"
+)
+
+// AWS Signature Version '4' constants.
+const (
+ signV4Algorithm = "AWS4-HMAC-SHA256"
+ signV2Algorithm = "AWS"
+)
+
+// Verify if request has JWT.
+func isRequestJWT(r *http.Request) bool {
+ return strings.HasPrefix(r.Header.Get("Authorization"), "Bearer")
+}
+
+// Verify if request has AWS Signature Version '4'.
+func isRequestSignatureV4(r *http.Request) bool {
+ return strings.HasPrefix(r.Header.Get("Authorization"), signV4Algorithm)
+}
+
+// Verify if request has AWS Signature Version '2'.
+func isRequestSignatureV2(r *http.Request) bool {
+ return (!strings.HasPrefix(r.Header.Get("Authorization"), signV4Algorithm) &&
+ strings.HasPrefix(r.Header.Get("Authorization"), signV2Algorithm))
+}
+
+// Verify if request has AWS PreSign Version '4'.
+func isRequestPresignedSignatureV4(r *http.Request) bool {
+ _, ok := r.URL.Query()["X-Amz-Credential"]
+ return ok
+}
+
+// Verify request has AWS PreSign Version '2'.
+func isRequestPresignedSignatureV2(r *http.Request) bool {
+ _, ok := r.URL.Query()["AWSAccessKeyId"]
+ return ok
+}
+
+// Verify if request has AWS Post policy Signature Version '4'.
+func isRequestPostPolicySignatureV4(r *http.Request) bool {
+ return strings.Contains(r.Header.Get("Content-Type"), "multipart/form-data") &&
+ r.Method == http.MethodPost
+}
+
+// Verify if the request has AWS Streaming Signature Version '4'. This is only valid for 'PUT' operation.
+func isRequestSignStreamingV4(r *http.Request) bool {
+ return r.Header.Get("x-amz-content-sha256") == streamingContentSHA256 &&
+ r.Method == http.MethodPut
+}
+
+// Authorization type.
+type authType int
+
+// List of all supported auth types.
+const (
+ authTypeUnknown authType = iota
+ authTypeAnonymous
+ authTypePresigned
+ authTypePresignedV2
+ authTypePostPolicy
+ authTypeStreamingSigned
+ authTypeSigned
+ authTypeSignedV2
+ authTypeJWT
+)
+
+// Get request authentication type.
+func getRequestAuthType(r *http.Request) authType {
+ if isRequestSignatureV2(r) {
+ return authTypeSignedV2
+ } else if isRequestPresignedSignatureV2(r) {
+ return authTypePresignedV2
+ } else if isRequestSignStreamingV4(r) {
+ return authTypeStreamingSigned
+ } else if isRequestSignatureV4(r) {
+ return authTypeSigned
+ } else if isRequestPresignedSignatureV4(r) {
+ return authTypePresigned
+ } else if isRequestJWT(r) {
+ return authTypeJWT
+ } else if isRequestPostPolicySignatureV4(r) {
+ return authTypePostPolicy
+ } else if _, ok := r.Header["Authorization"]; !ok {
+ return authTypeAnonymous
+ }
+ return authTypeUnknown
+}
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index f0d13af05..bd41a48fe 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -41,9 +41,15 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
+ rAuthType := getRequestAuthType(r)
+ dataReader := r.Body
+ if rAuthType == authTypeStreamingSigned{
+ dataReader = newSignV4ChunkedReader(r)
+ }
+
uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
- proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body)
+ proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader)
if err != nil {
glog.Errorf("NewRequest %s: %v", uploadUrl, err)