aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/s3api/s3api_object_handlers_list.go4
-rw-r--r--weed/s3api/s3api_object_versioning.go379
2 files changed, 199 insertions, 184 deletions
diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go
index ad65bd4fe..a4e844df8 100644
--- a/weed/s3api/s3api_object_handlers_list.go
+++ b/weed/s3api/s3api_object_handlers_list.go
@@ -573,6 +573,10 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
break
}
+ // Update nextMarker to ensure pagination advances past this .versions directory
+ // This is critical to prevent infinite loops when results are truncated
+ nextMarker = versionsDir
+
// Extract object name from .versions directory name (remove .versions suffix)
baseObjectName := strings.TrimSuffix(versionsDir, s3_constants.VersionsFolder)
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
index bbc43f205..a2a7c3a3d 100644
--- a/weed/s3api/s3api_object_versioning.go
+++ b/weed/s3api/s3api_object_versioning.go
@@ -264,225 +264,236 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
// findVersionsRecursively searches for all .versions directories and regular files recursively
func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error {
- // List entries in current directory
- entries, _, err := s3a.list(currentPath, "", "", false, 1000)
- if err != nil {
- return err
- }
+ // List entries in current directory with pagination
+ startFrom := ""
+ for {
+ entries, isLast, err := s3a.list(currentPath, "", startFrom, false, filer.PaginationSize)
+ if err != nil {
+ return err
+ }
- for _, entry := range entries {
- entryPath := path.Join(relativePath, entry.Name)
-
- // Skip if this doesn't match the prefix filter
- if normalizedPrefix := strings.TrimPrefix(prefix, "/"); normalizedPrefix != "" {
- // An entry is a candidate if:
- // 1. Its path is a match for the prefix.
- // 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
-
- // Condition 1: The entry's path starts with the prefix.
- isMatch := strings.HasPrefix(entryPath, normalizedPrefix)
- if !isMatch && entry.IsDirectory {
- // Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
- isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix)
- }
+ for _, entry := range entries {
+ // Track last entry name for pagination
+ startFrom = entry.Name
- // Condition 2: The prefix path starts with the entry's path (and it's a directory).
- canDescend := entry.IsDirectory && strings.HasPrefix(normalizedPrefix, entryPath)
+ entryPath := path.Join(relativePath, entry.Name)
- if !isMatch && !canDescend {
- continue
- }
- }
+ // Skip if this doesn't match the prefix filter
+ if normalizedPrefix := strings.TrimPrefix(prefix, "/"); normalizedPrefix != "" {
+ // An entry is a candidate if:
+ // 1. Its path is a match for the prefix.
+ // 2. It is a directory that is an ancestor of the prefix path, so we must descend into it.
- if entry.IsDirectory {
- // Skip .uploads directory (multipart upload temporary files)
- if strings.HasPrefix(entry.Name, ".uploads") {
- continue
- }
+ // Condition 1: The entry's path starts with the prefix.
+ isMatch := strings.HasPrefix(entryPath, normalizedPrefix)
+ if !isMatch && entry.IsDirectory {
+ // Also check if a directory entry matches a directory-style prefix (e.g., prefix "a/", entry "a").
+ isMatch = strings.HasPrefix(entryPath+"/", normalizedPrefix)
+ }
- // Check if this is a .versions directory
- if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
- // Extract object name from .versions directory name
- objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
- normalizedObjectKey := removeDuplicateSlashes(objectKey)
- // Mark both keys as processed for backward compatibility
- processedObjects[objectKey] = true
- processedObjects[normalizedObjectKey] = true
+ // Condition 2: The prefix path starts with the entry's path (and it's a directory).
+ canDescend := entry.IsDirectory && strings.HasPrefix(normalizedPrefix, entryPath)
- glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey)
+ if !isMatch && !canDescend {
+ continue
+ }
+ }
- versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
- if err != nil {
- glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err)
+ if entry.IsDirectory {
+ // Skip .uploads directory (multipart upload temporary files)
+ if strings.HasPrefix(entry.Name, ".uploads") {
continue
}
- for _, version := range versions {
- // Check for duplicate version IDs and skip if already seen
- // Use normalized key for deduplication
- versionKey := normalizedObjectKey + ":" + version.VersionId
- if seenVersionIds[versionKey] {
- glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey)
+ // Check if this is a .versions directory
+ if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
+ // Extract object name from .versions directory name
+ objectKey := strings.TrimSuffix(entryPath, s3_constants.VersionsFolder)
+ normalizedObjectKey := removeDuplicateSlashes(objectKey)
+ // Mark both keys as processed for backward compatibility
+ processedObjects[objectKey] = true
+ processedObjects[normalizedObjectKey] = true
+
+ glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey)
+
+ versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
+ if err != nil {
+ glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err)
continue
}
- seenVersionIds[versionKey] = true
-
- if version.IsDeleteMarker {
- glog.V(4).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
- normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
- deleteMarker := &DeleteMarkerEntry{
- Key: normalizedObjectKey, // Use normalized key for consistency
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
+
+ for _, version := range versions {
+ // Check for duplicate version IDs and skip if already seen
+ // Use normalized key for deduplication
+ versionKey := normalizedObjectKey + ":" + version.VersionId
+ if seenVersionIds[versionKey] {
+ glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey)
+ continue
}
- *allVersions = append(*allVersions, deleteMarker)
- } else {
- glog.V(4).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
- normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
+ seenVersionIds[versionKey] = true
+
+ if version.IsDeleteMarker {
+ glog.V(4).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
+ normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
+ deleteMarker := &DeleteMarkerEntry{
+ Key: normalizedObjectKey, // Use normalized key for consistency
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
+ }
+ *allVersions = append(*allVersions, deleteMarker)
+ } else {
+ glog.V(4).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
+ normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
+ versionEntry := &VersionEntry{
+ Key: normalizedObjectKey, // Use normalized key for consistency
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ ETag: version.ETag,
+ Size: version.Size,
+ Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
+ }
+ }
+ } else {
+ // This is a regular directory - check if it's an explicit S3 directory object
+ // Only include directories that were explicitly created via S3 API (have FolderMimeType)
+ // This excludes implicit directories created when uploading files like "test1/a"
+ if entry.Attributes.Mime == s3_constants.FolderMimeType {
+ directoryKey := entryPath
+ if !strings.HasSuffix(directoryKey, "/") {
+ directoryKey += "/"
+ }
+
+ // Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
+ glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey)
+
+ // Calculate ETag for empty directory
+ directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
+
versionEntry := &VersionEntry{
- Key: normalizedObjectKey, // Use normalized key for consistency
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- ETag: version.ETag,
- Size: version.Size,
- Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
+ Key: directoryKey,
+ VersionId: "null",
+ IsLatest: true,
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ ETag: directoryETag,
+ Size: 0, // Directories have size 0
+ Owner: s3a.getObjectOwnerFromEntry(entry),
StorageClass: "STANDARD",
}
*allVersions = append(*allVersions, versionEntry)
}
- }
- } else {
- // This is a regular directory - check if it's an explicit S3 directory object
- // Only include directories that were explicitly created via S3 API (have FolderMimeType)
- // This excludes implicit directories created when uploading files like "test1/a"
- if entry.Attributes.Mime == s3_constants.FolderMimeType {
- directoryKey := entryPath
- if !strings.HasSuffix(directoryKey, "/") {
- directoryKey += "/"
- }
- // Add directory as a version entry with VersionId "null" (following S3/Minio behavior)
- glog.V(2).Infof("findVersionsRecursively: found explicit S3 directory %s", directoryKey)
-
- // Calculate ETag for empty directory
- directoryETag := "\"d41d8cd98f00b204e9800998ecf8427e\""
-
- versionEntry := &VersionEntry{
- Key: directoryKey,
- VersionId: "null",
- IsLatest: true,
- LastModified: time.Unix(entry.Attributes.Mtime, 0),
- ETag: directoryETag,
- Size: 0, // Directories have size 0
- Owner: s3a.getObjectOwnerFromEntry(entry),
- StorageClass: "STANDARD",
+ // Recursively search subdirectories (regardless of whether they're explicit or implicit)
+ fullPath := path.Join(currentPath, entry.Name)
+ err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
+ if err != nil {
+ glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
+ continue
}
- *allVersions = append(*allVersions, versionEntry)
}
+ } else {
+ // This is a regular file - check if it's a pre-versioning object
+ objectKey := entryPath
+
+ // Normalize object key to ensure consistency with other version operations
+ normalizedObjectKey := removeDuplicateSlashes(objectKey)
- // Recursively search subdirectories (regardless of whether they're explicit or implicit)
- fullPath := path.Join(currentPath, entry.Name)
- err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
- if err != nil {
- glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
+ // Skip if this object already has a .versions directory (already processed)
+ // Check both normalized and original keys for backward compatibility
+ if processedObjects[objectKey] || processedObjects[normalizedObjectKey] {
+ glog.V(4).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v",
+ objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey])
continue
}
- }
- } else {
- // This is a regular file - check if it's a pre-versioning object
- objectKey := entryPath
-
- // Normalize object key to ensure consistency with other version operations
- normalizedObjectKey := removeDuplicateSlashes(objectKey)
-
- // Skip if this object already has a .versions directory (already processed)
- // Check both normalized and original keys for backward compatibility
- if processedObjects[objectKey] || processedObjects[normalizedObjectKey] {
- glog.V(4).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v",
- objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey])
- continue
- }
- glog.V(4).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey)
+ glog.V(4).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey)
- // This is a pre-versioning or suspended-versioning object
- // Check if this file has version metadata (ExtVersionIdKey)
- hasVersionMeta := false
- if entry.Extended != nil {
- if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
- hasVersionMeta = true
- glog.V(4).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes))
+ // This is a pre-versioning or suspended-versioning object
+ // Check if this file has version metadata (ExtVersionIdKey)
+ hasVersionMeta := false
+ if entry.Extended != nil {
+ if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
+ hasVersionMeta = true
+ glog.V(4).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes))
+ }
}
- }
- // Check if a .versions directory exists for this object
- versionsObjectPath := normalizedObjectKey + s3_constants.VersionsFolder
- _, versionsErr := s3a.getEntry(currentPath, versionsObjectPath)
- if versionsErr == nil {
- // .versions directory exists
- glog.V(4).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
-
- // If this file has version metadata, it's a suspended versioning null version
- // Include it and it will be the latest
- if hasVersionMeta {
- glog.V(4).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey)
- // Continue to add it below
- } else {
- // No version metadata - this is a pre-versioning file
- // Skip it if there's already a null version in .versions
- versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
- if err == nil {
- hasNullVersion := false
- for _, v := range versions {
- if v.VersionId == "null" {
- hasNullVersion = true
- break
+ // Check if a .versions directory exists for this object
+ versionsObjectPath := normalizedObjectKey + s3_constants.VersionsFolder
+ _, versionsErr := s3a.getEntry(currentPath, versionsObjectPath)
+ if versionsErr == nil {
+ // .versions directory exists
+ glog.V(4).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
+
+ // If this file has version metadata, it's a suspended versioning null version
+ // Include it and it will be the latest
+ if hasVersionMeta {
+ glog.V(4).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey)
+ // Continue to add it below
+ } else {
+ // No version metadata - this is a pre-versioning file
+ // Skip it if there's already a null version in .versions
+ versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
+ if err == nil {
+ hasNullVersion := false
+ for _, v := range versions {
+ if v.VersionId == "null" {
+ hasNullVersion = true
+ break
+ }
+ }
+ if hasNullVersion {
+ glog.V(4).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey)
+ processedObjects[objectKey] = true
+ processedObjects[normalizedObjectKey] = true
+ continue
}
}
- if hasNullVersion {
- glog.V(4).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey)
- processedObjects[objectKey] = true
- processedObjects[normalizedObjectKey] = true
- continue
- }
+ glog.V(4).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey)
}
- glog.V(4).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey)
+ } else {
+ glog.V(4).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
}
- } else {
- glog.V(4).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
- }
- // Add this file as a null version with IsLatest=true
- isLatest := true
+ // Add this file as a null version with IsLatest=true
+ isLatest := true
- // Check for duplicate version IDs and skip if already seen
- // Use normalized key for deduplication to match how other version operations work
- versionKey := normalizedObjectKey + ":null"
- if seenVersionIds[versionKey] {
- glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey)
- continue
- }
- seenVersionIds[versionKey] = true
-
- etag := s3a.calculateETagFromChunks(entry.Chunks)
-
- glog.V(4).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v",
- objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta)
-
- versionEntry := &VersionEntry{
- Key: normalizedObjectKey, // Use normalized key for consistency
- VersionId: "null",
- IsLatest: isLatest,
- LastModified: time.Unix(entry.Attributes.Mtime, 0),
- ETag: etag,
- Size: int64(entry.Attributes.FileSize),
- Owner: s3a.getObjectOwnerFromEntry(entry),
- StorageClass: "STANDARD",
+ // Check for duplicate version IDs and skip if already seen
+ // Use normalized key for deduplication to match how other version operations work
+ versionKey := normalizedObjectKey + ":null"
+ if seenVersionIds[versionKey] {
+ glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey)
+ continue
+ }
+ seenVersionIds[versionKey] = true
+
+ etag := s3a.calculateETagFromChunks(entry.Chunks)
+
+ glog.V(4).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v",
+ objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta)
+
+ versionEntry := &VersionEntry{
+ Key: normalizedObjectKey, // Use normalized key for consistency
+ VersionId: "null",
+ IsLatest: isLatest,
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ ETag: etag,
+ Size: int64(entry.Attributes.FileSize),
+ Owner: s3a.getObjectOwnerFromEntry(entry),
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
}
- *allVersions = append(*allVersions, versionEntry)
+ }
+
+ // If we've reached the last page, stop pagination
+ if isLast {
+ break
}
}