aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-04-06 22:56:39 +0500
committerGitHub <noreply@github.com>2024-04-06 10:56:39 -0700
commitd42a04cceb51d06ae6ec0e80ca8675969018d98b (patch)
treef54efe78482ce76ff9d36a8c127dc9f01a675073
parent7aa25c113736f9078b34d169b9bfcd4a677a65c4 (diff)
downloadseaweedfs-d42a04cceb51d06ae6ec0e80ca8675969018d98b.tar.xz
seaweedfs-d42a04cceb51d06ae6ec0e80ca8675969018d98b.zip
[s3] fix s3 test_multipart_resend_first_finishes_last (#5471)
* try fix s3 test https://github.com/seaweedfs/seaweedfs/pull/5466 * add error handler metrics * refactor * refactor multipartExt * delete bad entry parts
-rw-r--r--weed/s3api/filer_multipart.go149
-rw-r--r--weed/s3api/filer_multipart_test.go81
-rw-r--r--weed/stats/metrics.go9
-rw-r--r--weed/stats/metrics_names.go9
4 files changed, 101 insertions, 147 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 7dd68466e..e8491fce7 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -1,9 +1,12 @@
package s3api
import (
+ "cmp"
"encoding/hex"
"encoding/xml"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "golang.org/x/exp/slices"
"math"
"path/filepath"
"sort"
@@ -11,18 +14,18 @@ import (
"strings"
"time"
- "github.com/google/uuid"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "golang.org/x/exp/slices"
-
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
+ "github.com/google/uuid"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
+const multipartExt = ".part"
+
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput
@@ -71,74 +74,97 @@ type CompleteMultipartUploadResult struct {
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
-
- completedParts := parts.Parts
- slices.SortFunc(completedParts, func(a, b CompletedPart) int {
- return a.PartNumber - b.PartNumber
- })
-
+ completedPartNumbers := []int{}
+ completedPartMap := make(map[int][]string)
+ for _, part := range parts.Parts {
+ if _, ok := completedPartMap[part.PartNumber]; !ok {
+ completedPartNumbers = append(completedPartNumbers, part.PartNumber)
+ }
+ completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
+ }
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
entries, _, err := s3a.list(uploadDirectory, "", "", false, maxPartsList)
if err != nil || len(entries) == 0 {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
+ deleteEntries := []*filer_pb.Entry{}
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
for _, entry := range entries {
+ foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
- if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
- var partNumberString string
- index := strings.Index(entry.Name, "_")
- if index != -1 {
- partNumberString = entry.Name[:index]
+ if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
+ continue
+ }
+ partNumber, err := parsePartNumber(entry.Name)
+ if err != nil {
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
+ glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
+ continue
+ }
+ completedPartsByNumber, ok := completedPartMap[partNumber]
+ if !ok {
+ continue
+ }
+ for _, partETag := range completedPartsByNumber {
+ partETag = strings.Trim(partETag, `"`)
+ entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
+ if partETag != "" && len(partETag) == 32 && entryETag != "" {
+ if entryETag != partETag {
+ glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
+ continue
+ }
} else {
- partNumberString = entry.Name[:len(entry.Name)-len(".part")]
+ glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
}
- partNumber, err := strconv.Atoi(partNumberString)
- if err != nil {
- glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", partNumberString, err)
+ if len(entry.Chunks) == 0 {
+ glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue
}
//there maybe multi same part, because of client retry
partEntries[partNumber] = append(partEntries[partNumber], entry)
+ foundEntry = true
+ }
+ if !foundEntry {
+ deleteEntries = append(deleteEntries, entry)
}
-
}
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
- var deleteEntries []*filer_pb.Entry
- for _, part := range completedParts {
- entries := partEntries[part.PartNumber]
- // check whether completedParts is more than received parts
- if len(entries) == 0 {
- glog.Errorf("part %d has no entry", part.PartNumber)
+ sort.Ints(completedPartNumbers)
+ for _, partNumber := range completedPartNumbers {
+ partEntriesByNumber, ok := partEntries[partNumber]
+ if !ok {
+ glog.Errorf("part %d has no entry", partNumber)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, s3err.ErrInvalidPart
}
-
found := false
- for _, entry := range entries {
+ if len(partEntriesByNumber) > 1 {
+ slices.SortFunc(partEntriesByNumber, func(a, b *filer_pb.Entry) int {
+ return cmp.Compare(b.Chunks[0].ModifiedTsNs, a.Chunks[0].ModifiedTsNs)
+ })
+ }
+ for _, entry := range partEntriesByNumber {
if found {
deleteEntries = append(deleteEntries, entry)
- continue
- }
-
- partETag := strings.Trim(part.ETag, `"`)
- entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
- glog.Warningf("complete etag %s, partEtag %s", partETag, entryETag)
- if partETag != "" && len(partETag) == 32 && entryETag != "" && entryETag != partETag {
- err = fmt.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
- deleteEntries = append(deleteEntries, entry)
+ stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
continue
}
for _, chunk := range entry.GetChunks() {
@@ -154,13 +180,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
offset += int64(chunk.Size)
}
found = true
- err = nil
}
- if err != nil {
- glog.Errorf("%s", err)
- return nil, s3err.ErrInvalidPart
- }
-
}
entryName := filepath.Base(*input.Key)
@@ -223,29 +243,15 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return
}
-func findByPartNumber(fileName string, parts []CompletedPart) (etag string, found bool) {
- partNumber, formatErr := strconv.Atoi(fileName[:4])
- if formatErr != nil {
- return
- }
- x := sort.Search(len(parts), func(i int) bool {
- return parts[i].PartNumber >= partNumber
- })
- if x >= len(parts) {
- return
- }
- if parts[x].PartNumber != partNumber {
- return
+func parsePartNumber(fileName string) (int, error) {
+ var partNumberString string
+ index := strings.Index(fileName, "_")
+ if index != -1 {
+ partNumberString = fileName[:index]
+ } else {
+ partNumberString = fileName[:len(fileName)-len(multipartExt)]
}
- y := 0
- for i, part := range parts[x:] {
- if part.PartNumber == partNumber {
- y = i
- } else {
- break
- }
- }
- return parts[x+y].ETag, true
+ return strconv.Atoi(partNumberString)
}
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
@@ -361,7 +367,7 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
StorageClass: aws.String("STANDARD"),
}
- entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d.part", *input.PartNumberMarker), false, uint32(*input.MaxParts))
+ entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
@@ -373,15 +379,8 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries {
- if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
- var partNumberString string
- index := strings.Index(entry.Name, "_")
- if index != -1 {
- partNumberString = entry.Name[:index]
- } else {
- partNumberString = entry.Name[:len(entry.Name)-len(".part")]
- }
- partNumber, err := strconv.Atoi(partNumberString)
+ if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
+ partNumber, err := parsePartNumber(entry.Name)
if err != nil {
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
diff --git a/weed/s3api/filer_multipart_test.go b/weed/s3api/filer_multipart_test.go
index e76d903b8..7f75a40de 100644
--- a/weed/s3api/filer_multipart_test.go
+++ b/weed/s3api/filer_multipart_test.go
@@ -50,88 +50,27 @@ func TestListPartsResult(t *testing.T) {
}
-func Test_findByPartNumber(t *testing.T) {
- type args struct {
- fileName string
- parts []CompletedPart
- }
-
- parts := []CompletedPart{
- {
- ETag: "xxx",
- PartNumber: 1,
- },
- {
- ETag: "lll",
- PartNumber: 1,
- },
- {
- ETag: "yyy",
- PartNumber: 3,
- },
- {
- ETag: "zzz",
- PartNumber: 5,
- },
- }
-
+func Test_parsePartNumber(t *testing.T) {
tests := []struct {
- name string
- args args
- wantEtag string
- wantFound bool
+ name string
+ fileName string
+ partNum int
}{
{
"first",
- args{
- "0001.part",
- parts,
- },
- "lll",
- true,
+ "0001_uuid.part",
+ 1,
},
{
"second",
- args{
- "0002.part",
- parts,
- },
- "",
- false,
- },
- {
- "third",
- args{
- "0003.part",
- parts,
- },
- "yyy",
- true,
- },
- {
- "fourth",
- args{
- "0004.part",
- parts,
- },
- "",
- false,
- },
- {
- "fifth",
- args{
- "0005.part",
- parts,
- },
- "zzz",
- true,
+ "0002.part",
+ 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- gotEtag, gotFound := findByPartNumber(tt.args.fileName, tt.args.parts)
- assert.Equalf(t, tt.wantEtag, gotEtag, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
- assert.Equalf(t, tt.wantFound, gotFound, "findByPartNumber(%v, %v)", tt.args.fileName, tt.args.parts)
+ partNumber, _ := parsePartNumber(tt.fileName)
+ assert.Equalf(t, tt.partNum, partNumber, "parsePartNumber(%v)", tt.fileName)
})
}
}
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index f61f68e08..83391f047 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -241,7 +241,13 @@ var (
Name: "request_total",
Help: "Counter of s3 requests.",
}, []string{"type", "code", "bucket"})
-
+ S3HandlerCounter = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: "s3",
+ Name: "handler_total",
+ Help: "Counter of s3 server handlers.",
+ }, []string{"type"})
S3RequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
@@ -292,6 +298,7 @@ func init() {
Gather.MustRegister(VolumeServerResourceGauge)
Gather.MustRegister(S3RequestCounter)
+ Gather.MustRegister(S3HandlerCounter)
Gather.MustRegister(S3RequestHistogram)
Gather.MustRegister(S3TimeToFirstByteHistogram)
}
diff --git a/weed/stats/metrics_names.go b/weed/stats/metrics_names.go
index ffb0c76f1..cfc0fbeb0 100644
--- a/weed/stats/metrics_names.go
+++ b/weed/stats/metrics_names.go
@@ -43,4 +43,13 @@ const (
ErrorChunkAssign = "chunkAssign.failed"
ErrorReadCache = "read.cache.failed"
ErrorReadStream = "read.stream.failed"
+
+ // s3 handler
+ ErrorCompletedNoSuchUpload = "errorCompletedNoSuchUpload"
+ ErrorCompletedPartEmpty = "ErrorCompletedPartEmpty"
+ ErrorCompletedPartNumber = "ErrorCompletedPartNumber"
+ ErrorCompletedPartNotFound = "errorCompletedPartNotFound"
+ ErrorCompletedEtagInvalid = "errorCompletedEtagInvalid"
+ ErrorCompletedEtagMismatch = "errorCompletedEtagMismatch"
+ ErrorCompletedPartEntryMismatch = "errorCompletedPartEntryMismatch"
)