aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go244
-rw-r--r--weed/s3api/s3api_objects_list_handlers_test.go52
2 files changed, 181 insertions, 115 deletions
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 448b082eb..620969fd6 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -9,7 +9,6 @@ import (
"io"
"net/http"
"net/url"
- "path/filepath"
"strconv"
"strings"
"time"
@@ -126,105 +125,87 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
writeSuccessResponseXML(w, r, response)
}
-func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, marker string, delimiter string) (response ListBucketResult, err error) {
+func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys int, originalMarker string, delimiter string) (response ListBucketResult, err error) {
// convert full path prefix into directory name and prefix for entry name
- reqDir, prefix := filepath.Split(originalPrefix)
- if strings.HasPrefix(reqDir, "/") {
- reqDir = reqDir[1:]
- }
+ requestDir, prefix, marker := normalizePrefixMarker(originalPrefix, originalMarker)
bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
- bucketPrefixLen := len(bucketPrefix)
- reqDir = fmt.Sprintf("%s%s", bucketPrefix, reqDir)
- if strings.HasSuffix(reqDir, "/") {
- reqDir = strings.TrimSuffix(reqDir, "/")
+ reqDir := bucketPrefix[:len(bucketPrefix)-1]
+ if requestDir != "" {
+ reqDir = fmt.Sprintf("%s%s", bucketPrefix, requestDir)
}
var contents []ListEntry
var commonPrefixes []PrefixEntry
- var isTruncated bool
var doErr error
var nextMarker string
+ cursor := &ListingCursor{
+ maxKeys: maxKeys,
+ }
// check filer
err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- _, isTruncated, nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, maxKeys, marker, delimiter, false, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) {
+ nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) {
if entry.IsDirectory {
- if delimiter == "/" {
+ // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
+ if delimiter == "/" { // A response can contain CommonPrefixes only if you specify a delimiter.
commonPrefixes = append(commonPrefixes, PrefixEntry{
- Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:],
+ Prefix: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
})
- }
- if !(entry.IsDirectoryKeyObject() && strings.HasSuffix(entry.Name, "/")) {
- return
- }
- }
- storageClass := "STANDARD"
- if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
- storageClass = string(v)
- }
- contents = append(contents, ListEntry{
- Key: fmt.Sprintf("%s/%s", dir, entry.Name)[bucketPrefixLen:],
- LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
- ETag: "\"" + filer.ETag(entry) + "\"",
- Size: int64(filer.FileSize(entry)),
- Owner: CanonicalUser{
- ID: fmt.Sprintf("%x", entry.Attributes.Uid),
- DisplayName: entry.Attributes.UserName,
- },
- StorageClass: StorageClass(storageClass),
- })
- })
- glog.V(4).Infof("end doListFilerEntries isTruncated:%v nextMarker:%v reqDir: %v prefix: %v", isTruncated, nextMarker, reqDir, prefix)
- if doErr != nil {
- return doErr
- }
-
- if !isTruncated {
- nextMarker = ""
- }
-
- if len(contents) == 0 && len(commonPrefixes) == 0 && maxKeys > 0 {
- if strings.HasSuffix(originalPrefix, "/") && prefix == "" {
- reqDir, prefix = filepath.Split(strings.TrimSuffix(reqDir, "/"))
- reqDir = strings.TrimSuffix(reqDir, "/")
- }
- _, _, _, doErr = s3a.doListFilerEntries(client, reqDir, prefix, 1, prefix, delimiter, true, false, bucketPrefixLen, func(dir string, entry *filer_pb.Entry) {
- if entry.IsDirectoryKeyObject() && entry.Name == prefix {
- storageClass := "STANDARD"
- if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
- storageClass = string(v)
- }
+ //All of the keys (up to 1,000) rolled up into a common prefix count as a single return when calculating the number of returns.
+ cursor.maxKeys--
+ } else if entry.IsDirectoryKeyObject() {
contents = append(contents, ListEntry{
- Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[bucketPrefixLen:],
+ Key: fmt.Sprintf("%s/%s/", dir, entry.Name)[len(bucketPrefix):],
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
- ETag: "\"" + fmt.Sprintf("%x", entry.Attributes.Md5) + "\"",
- Size: int64(filer.FileSize(entry)),
+ ETag: "\"" + filer.ETag(entry) + "\"",
Owner: CanonicalUser{
ID: fmt.Sprintf("%x", entry.Attributes.Uid),
DisplayName: entry.Attributes.UserName,
},
- StorageClass: StorageClass(storageClass),
+ StorageClass: "STANDARD",
})
+ cursor.maxKeys--
}
- })
- if doErr != nil {
- return doErr
+ } else {
+ storageClass := "STANDARD"
+ if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
+ storageClass = string(v)
+ }
+ contents = append(contents, ListEntry{
+ Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
+ LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
+ ETag: "\"" + filer.ETag(entry) + "\"",
+ Size: int64(filer.FileSize(entry)),
+ Owner: CanonicalUser{
+ ID: fmt.Sprintf("%x", entry.Attributes.Uid),
+ DisplayName: entry.Attributes.UserName,
+ },
+ StorageClass: StorageClass(storageClass),
+ })
+ cursor.maxKeys--
}
+ })
+ if doErr != nil {
+ return doErr
}
- if len(nextMarker) > 0 {
- nextMarker = nextMarker[bucketPrefixLen:]
+ if !cursor.isTruncated {
+ nextMarker = ""
+ } else {
+ if requestDir != "" {
+ nextMarker = requestDir + "/" + nextMarker
+ }
}
response = ListBucketResult{
Name: bucket,
Prefix: originalPrefix,
- Marker: marker,
+ Marker: originalMarker,
NextMarker: nextMarker,
MaxKeys: maxKeys,
Delimiter: delimiter,
- IsTruncated: isTruncated,
+ IsTruncated: cursor.isTruncated,
Contents: contents,
CommonPrefixes: commonPrefixes,
}
@@ -235,7 +216,64 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
return
}
-func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, maxKeys int, marker, delimiter string, inclusiveStartFrom bool, subEntries bool, bucketPrefixLen int, eachEntryFn func(dir string, entry *filer_pb.Entry)) (counter int, isTruncated bool, nextMarker string, err error) {
+type ListingCursor struct {
+ maxKeys int
+ isTruncated bool
+}
+
+// the prefix and marker may be in different directories
+// normalizePrefixMarker ensures the prefix and marker both starts from the same directory
+func normalizePrefixMarker(prefix, marker string) (alignedDir, alignedPrefix, alignedMarker string) {
+ // alignedDir should not end with "/"
+ // alignedDir, alignedPrefix, alignedMarker should only have "/" in middle
+ prefix = strings.TrimLeft(prefix, "/")
+ marker = strings.TrimLeft(marker, "/")
+ if prefix == "" {
+ return "", "", marker
+ }
+ if marker == "" {
+ alignedDir, alignedPrefix = toDirAndName(prefix)
+ return
+ }
+ if !strings.HasPrefix(marker, prefix) {
+ // something wrong
+ return "", prefix, marker
+ }
+ if strings.HasPrefix(marker, prefix+"/") {
+ alignedDir = prefix
+ alignedPrefix = ""
+ alignedMarker = marker[len(alignedDir)+1:]
+ return
+ }
+
+ alignedDir, alignedPrefix = toDirAndName(prefix)
+ if alignedDir != "" {
+ alignedMarker = marker[len(alignedDir)+1:]
+ } else {
+ alignedMarker = marker
+ }
+ return
+}
+func toDirAndName(dirAndName string) (dir, name string) {
+ sepIndex := strings.LastIndex(dirAndName, "/")
+ if sepIndex >= 0 {
+ dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:]
+ } else {
+ name = dirAndName
+ }
+ return
+}
+func toParentAndDescendants(dirAndName string) (dir, name string) {
+ sepIndex := strings.Index(dirAndName, "/")
+ if sepIndex >= 0 {
+ dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:]
+ } else {
+ name = dirAndName
+ }
+ return
+}
+
+func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
// invariants
// prefix and marker should be under dir, marker may contain "/"
// maxKeys should be updated for each recursion
@@ -243,37 +281,23 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
if prefix == "/" && delimiter == "/" {
return
}
- if maxKeys <= 0 {
+ if cursor.maxKeys <= 0 {
return
}
if strings.Contains(marker, "/") {
- if strings.HasSuffix(marker, "/") {
- marker = strings.TrimSuffix(marker, "/")
- }
- sepIndex := strings.Index(marker, "/")
- if sepIndex != -1 {
- subPrefix, subMarker := marker[0:sepIndex], marker[sepIndex+1:]
- var subDir string
- if len(dir) > bucketPrefixLen && dir[bucketPrefixLen:] == subPrefix {
- subDir = dir
- } else {
- subDir = fmt.Sprintf("%s/%s", dir, subPrefix)
- }
- subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, subDir, "", maxKeys, subMarker, delimiter, false, false, bucketPrefixLen, eachEntryFn)
- if subErr != nil {
- err = subErr
- return
- }
- counter += subCounter
- isTruncated = isTruncated || subIsTruncated
- maxKeys -= subCounter
- nextMarker = subNextMarker
- // finished processing this sub directory
- marker = subPrefix
+ subDir, subMarker := toParentAndDescendants(marker)
+ // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker)
+ subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, eachEntryFn)
+ if subErr != nil {
+ err = subErr
+ return
}
+ nextMarker = subDir + "/" + subNextMarker
+ // finished processing this sub directory
+ marker = subDir
}
- if maxKeys <= 0 {
+ if cursor.maxKeys <= 0 {
return
}
@@ -281,7 +305,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
request := &filer_pb.ListEntriesRequest{
Directory: dir,
Prefix: prefix,
- Limit: uint32(maxKeys + 2), // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder
+ Limit: uint32(cursor.maxKeys + 2), // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder
StartFromFileName: marker,
InclusiveStartFrom: inclusiveStartFrom,
}
@@ -304,38 +328,31 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
return
}
}
- if counter >= maxKeys {
- isTruncated = true
+ if cursor.maxKeys <= 0 {
+ cursor.isTruncated = true
return
}
entry := resp.Entry
- nextMarker = dir + "/" + entry.Name
+ nextMarker = entry.Name
if entry.IsDirectory {
// println("ListEntries", dir, "dir:", entry.Name)
if entry.Name == s3_constants.MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys
continue
}
- if delimiter == "" {
+ if delimiter != "/" {
eachEntryFn(dir, entry)
- // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter)
- subCounter, subIsTruncated, subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", maxKeys-counter, "", delimiter, false, true, bucketPrefixLen, eachEntryFn)
+ subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, eachEntryFn)
if subErr != nil {
err = fmt.Errorf("doListFilerEntries2: %v", subErr)
return
}
- // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "maxKeys", maxKeys-counter, "subCounter", subCounter, "subNextMarker", subNextMarker, "subIsTruncated", subIsTruncated)
- if subCounter == 0 && entry.IsDirectoryKeyObject() {
- entry.Name += "/"
- eachEntryFn(dir, entry)
- counter++
- }
- counter += subCounter
- nextMarker = subNextMarker
- if subIsTruncated {
- isTruncated = true
+ // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "subNextMarker", subNextMarker)
+ nextMarker = entry.Name + "/" + subNextMarker
+ if cursor.isTruncated {
return
}
- } else if delimiter == "/" {
+ // println("doListFilerEntries2 nextMarker", nextMarker)
+ } else {
var isEmpty bool
if !s3a.option.AllowEmptyFolder && !entry.IsDirectoryKeyObject() {
if isEmpty, err = s3a.ensureDirectoryAllEmpty(client, dir, entry.Name); err != nil {
@@ -343,15 +360,12 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
}
}
if !isEmpty {
- nextMarker += "/"
eachEntryFn(dir, entry)
- counter++
}
}
- } else if !(delimiter == "/" && subEntries) {
- // println("ListEntries", dir, "file:", entry.Name)
+ } else {
eachEntryFn(dir, entry)
- counter++
+ // println("ListEntries", dir, "file:", entry.Name, "maxKeys", cursor.maxKeys)
}
}
return
diff --git a/weed/s3api/s3api_objects_list_handlers_test.go b/weed/s3api/s3api_objects_list_handlers_test.go
index 8734ff2ff..585aa40b1 100644
--- a/weed/s3api/s3api_objects_list_handlers_test.go
+++ b/weed/s3api/s3api_objects_list_handlers_test.go
@@ -2,6 +2,7 @@ package s3api
import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ "github.com/stretchr/testify/assert"
"testing"
"time"
)
@@ -37,3 +38,54 @@ func TestListObjectsHandler(t *testing.T) {
t.Errorf("unexpected output: %s\nexpecting:%s", encoded, expected)
}
}
+
+func Test_normalizePrefixMarker(t *testing.T) {
+ type args struct {
+ prefix string
+ marker string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantAlignedDir string
+ wantAlignedPrefix string
+ wantAlignedMarker string
+ }{
+ {"prefix is a directory",
+ args{"/parentDir/data/",
+ ""},
+ "parentDir/data",
+ "",
+ "",
+ },
+ {"normal case",
+ args{"/parentDir/data/0",
+ "parentDir/data/0e/0e149049a2137b0cc12e"},
+ "parentDir/data",
+ "0",
+ "0e/0e149049a2137b0cc12e",
+ },
+ {"empty prefix",
+ args{"",
+ "parentDir/data/0e/0e149049a2137b0cc12e"},
+ "",
+ "",
+ "parentDir/data/0e/0e149049a2137b0cc12e",
+ },
+ {"empty directory",
+ args{"parent",
+ "parentDir/data/0e/0e149049a2137b0cc12e"},
+ "",
+ "parent",
+ "parentDir/data/0e/0e149049a2137b0cc12e",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ gotAlignedDir, gotAlignedPrefix, gotAlignedMarker := normalizePrefixMarker(tt.args.prefix, tt.args.marker)
+ assert.Equalf(t, tt.wantAlignedDir, gotAlignedDir, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
+ assert.Equalf(t, tt.wantAlignedPrefix, gotAlignedPrefix, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
+ assert.Equalf(t, tt.wantAlignedMarker, gotAlignedMarker, "normalizePrefixMarker(%v, %v)", tt.args.prefix, tt.args.marker)
+ })
+ }
+}