aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/s3api/filer_multipart.go76
-rw-r--r--weed/s3api/s3api_object_copy_handlers.go12
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go9
3 files changed, 66 insertions, 31 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 765a5679e..ac9bffe21 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -4,9 +4,6 @@ import (
"encoding/hex"
"encoding/xml"
"fmt"
- "github.com/google/uuid"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "golang.org/x/exp/slices"
"math"
"path/filepath"
"sort"
@@ -14,6 +11,10 @@ import (
"strings"
"time"
+ "github.com/google/uuid"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ "golang.org/x/exp/slices"
+
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
@@ -90,40 +91,55 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrNoSuchUpload
}
- // check whether completedParts is more than received parts
- {
- partNumbers := make(map[int]struct{}, len(entries))
- for _, entry := range entries {
- if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
- partNumberString := entry.Name[:len(entry.Name)-len(".part")]
- partNumber, err := strconv.Atoi(partNumberString)
- if err == nil {
- partNumbers[partNumber] = struct{}{}
- }
+ partEntries := make(map[int][]*filer_pb.Entry, len(entries))
+ for _, entry := range entries {
+ glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
+ if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
+ var partNumberString string
+ index := strings.Index(entry.Name, "_")
+ if index != -1 {
+ partNumberString = entry.Name[:index]
+ } else {
+ partNumberString = entry.Name[:len(entry.Name)-len(".part")]
}
- }
- for _, part := range completedParts {
- if _, found := partNumbers[part.PartNumber]; !found {
- return nil, s3err.ErrInvalidPart
+ partNumber, err := strconv.Atoi(partNumberString)
+ if err != nil {
+ glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err)
+ continue
}
+ //there maybe multi same part, because of client retry
+ partEntries[partNumber] = append(partEntries[partNumber], entry)
}
+
}
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
+ var deleteEntries []*filer_pb.Entry
+ for _, part := range completedParts {
+ entries := partEntries[part.PartNumber]
+ // check whether completedParts is more than received parts
+ if len(entries) == 0 {
+ glog.Errorf("part %d has no entry", part.PartNumber)
+ return nil, s3err.ErrInvalidPart
+ }
- for _, entry := range entries {
- if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
- partETag, found := findByPartNumber(entry.Name, completedParts)
- if !found {
+ found := false
+ for _, entry := range entries {
+ if found {
+ deleteEntries = append(deleteEntries, entry)
continue
}
+
+ partETag := strings.Trim(part.ETag, `"`)
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
+ glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag)
if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
- glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
- return nil, s3err.ErrInvalidPart
+ err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
+ deleteEntries = append(deleteEntries, entry)
+ continue
}
for _, chunk := range entry.GetChunks() {
p := &filer_pb.FileChunk{
@@ -137,7 +153,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
}
+ found = true
+ err = nil
+ }
+ if err != nil {
+ glog.Errorf("%s", err)
+ return nil, s3err.ErrInvalidPart
}
+
}
entryName := filepath.Base(*input.Key)
@@ -186,6 +209,13 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
},
}
+ for _, deleteEntry := range deleteEntries {
+ //delete unused part data
+ glog.Infof("completeMultipartUpload cleanup %s upload %s unused %s", *input.Bucket, *input.UploadId, deleteEntry.Name)
+ if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
+ glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
+ }
+ }
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
diff --git a/weed/s3api/s3api_object_copy_handlers.go b/weed/s3api/s3api_object_copy_handlers.go
index 8dc33f213..8d13fe17e 100644
--- a/weed/s3api/s3api_object_copy_handlers.go
+++ b/weed/s3api/s3api_object_copy_handlers.go
@@ -2,16 +2,17 @@ package s3api
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "modernc.org/strutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
+ "modernc.org/strutil"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -170,8 +171,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
rangeHeader := r.Header.Get("x-amz-copy-source-range")
- dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
+ dstUrl := s3a.genPartUploadUrl(dstBucket, uploadID, partID)
srcUrl := fmt.Sprintf("http://%s%s/%s%s",
s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index 187022079..6fecdcf2d 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -10,6 +10,7 @@ import (
"strconv"
"strings"
+ "github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
@@ -247,8 +248,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
- uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID)
+ uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
if partID == 1 && r.Header.Get("Content-Type") == "" {
dataReader = mimeDetect(r, dataReader)
@@ -271,6 +271,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
}
+func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
+ return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
+ s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
+}
+
// Generate uploadID hash string from object
func (s3a *S3ApiServer) generateUploadID(object string) string {
if strings.HasPrefix(object, "/") {