aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_versioning.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-19 21:43:34 -0700
committerGitHub <noreply@github.com>2025-07-19 21:43:34 -0700
commit12f50d37fa52444a43ad6ff4cc3d156db4035528 (patch)
treef2ea4466b899e18672530238dc7b35b91115e963 /weed/s3api/s3api_object_versioning.go
parent0e4d803896fc9a48a77d0d1669583c613452539c (diff)
downloadseaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.tar.xz
seaweedfs-12f50d37fa52444a43ad6ff4cc3d156db4035528.zip
test versioning also (#7000)
* test versioning also * fix some versioning tests * fall back * fixes Never-versioned buckets: No VersionId headers, no Status field Pre-versioning objects: Regular files, VersionId="null", included in all operations Post-versioning objects: Stored in .versions directories with real version IDs Suspended versioning: Proper status handling and null version IDs * fixes Bucket Versioning Status Compliance Fixed: New buckets now return no Status field (AWS S3 compliant) Before: Always returned "Suspended" ❌ After: Returns empty VersioningConfiguration for unconfigured buckets ✅ 2. Multi-Object Delete Versioning Support Fixed: DeleteMultipleObjectsHandler now fully versioning-aware Before: Always deleted physical files, breaking versioning ❌ After: Creates delete markers or deletes specific versions properly ✅ Added: DeleteMarker field in response structure for AWS compatibility 3. Copy Operations Versioning Support Fixed: CopyObjectHandler and CopyObjectPartHandler now versioning-aware Before: Only copied regular files, couldn't handle versioned sources ❌ After: Parses version IDs from copy source, creates versions in destination ✅ Added: pathToBucketObjectAndVersion() function for version ID parsing 4. Pre-versioning Object Handling Fixed: getLatestObjectVersion() now has proper fallback logic Before: Failed when .versions directory didn't exist ❌ After: Falls back to regular objects for pre-versioning scenarios ✅ 5. Enhanced Object Version Listings Fixed: listObjectVersions() includes both versioned AND pre-versioning objects Before: Only showed .versions directories, ignored pre-versioning objects ❌ After: Shows complete version history with VersionId="null" for pre-versioning ✅ 6. Null Version ID Handling Fixed: getSpecificObjectVersion() properly handles versionId="null" Before: Couldn't retrieve pre-versioning objects by version ID ❌ After: Returns regular object files for "null" version requests ✅ 7. Version ID Response Headers Fixed: PUT operations only return x-amz-version-id when appropriate Before: Returned version IDs for non-versioned buckets ❌ After: Only returns version IDs for explicitly configured versioning ✅ * more fixes * fix copying with versioning, multipart upload * more fixes * reduce volume size for easier dev test * fix * fix version id * fix versioning * Update filer_multipart.go * fix multipart versioned upload * more fixes * more fixes * fix versioning on suspended * fixes * fixing test_versioning_obj_suspended_copy * Update s3api_object_versioning.go * fix versions * skipping test_versioning_obj_suspend_versions * > If the versioning state has never been set on a bucket, it has no versioning state; a GetBucketVersioning request does not return a versioning state value. * fix tests, avoid duplicated bucket creation, skip tests * only run s3tests_boto3/functional/test_s3.py * fix checking filer_pb.ErrNotFound * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_handlers_copy.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_bucket_config.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update test/s3/versioning/s3_versioning_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
-rw-r--r--weed/s3api/s3api_object_versioning.go422
1 files changed, 350 insertions, 72 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
index cfb3d597c..d1893cb85 100644
--- a/weed/s3api/s3api_object_versioning.go
+++ b/weed/s3api/s3api_object_versioning.go
@@ -2,7 +2,6 @@ package s3api
import (
"crypto/rand"
- "crypto/sha256"
"encoding/hex"
"encoding/xml"
"fmt"
@@ -48,20 +47,26 @@ type ListObjectVersionsResult struct {
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
}
-// generateVersionId creates a unique version ID
+// generateVersionId creates a unique version ID that preserves chronological order
func generateVersionId() string {
- // Generate a random 16-byte value
- randBytes := make([]byte, 16)
+ // Use nanosecond timestamp to ensure chronological ordering
+ // Format as 16-digit hex (first 16 chars of version ID)
+ now := time.Now().UnixNano()
+ timestampHex := fmt.Sprintf("%016x", now)
+
+ // Generate random 8 bytes for uniqueness (last 16 chars of version ID)
+ randBytes := make([]byte, 8)
if _, err := rand.Read(randBytes); err != nil {
glog.Errorf("Failed to generate random bytes for version ID: %v", err)
- return ""
+ // Fallback to timestamp-only if random generation fails
+ return timestampHex + "0000000000000000"
}
- // Hash with current timestamp for uniqueness
- hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...))
+ // Combine timestamp (16 chars) + random (16 chars) = 32 chars total
+ randomHex := hex.EncodeToString(randBytes)
+ versionId := timestampHex + randomHex
- // Return first 32 characters of hex string (same length as AWS S3 version IDs)
- return hex.EncodeToString(hash[:])[:32]
+ return versionId
}
// getVersionedObjectDir returns the directory path for storing object versions
@@ -122,59 +127,20 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
- // List all entries in bucket
- entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2))
- if err != nil {
- return nil, err
- }
-
- // For each entry, check if it's a .versions directory
- for _, entry := range entries {
- if !entry.IsDirectory {
- continue
- }
-
- // Check if this is a .versions directory
- if !strings.HasSuffix(entry.Name, ".versions") {
- continue
- }
+ // Track objects that have been processed to avoid duplicates
+ processedObjects := make(map[string]bool)
- // Extract object name from .versions directory name
- objectKey := strings.TrimSuffix(entry.Name, ".versions")
+ // Track version IDs globally to prevent duplicates throughout the listing
+ seenVersionIds := make(map[string]bool)
- versions, err := s3a.getObjectVersionList(bucket, objectKey)
- if err != nil {
- glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
- continue
- }
-
- for _, version := range versions {
- if version.IsDeleteMarker {
- deleteMarker := &DeleteMarkerEntry{
- Key: objectKey,
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
- }
- allVersions = append(allVersions, deleteMarker)
- } else {
- versionEntry := &VersionEntry{
- Key: objectKey,
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- ETag: version.ETag,
- Size: version.Size,
- Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
- StorageClass: "STANDARD",
- }
- allVersions = append(allVersions, versionEntry)
- }
- }
+ // Recursively find all .versions directories in the bucket
+ bucketPath := path.Join(s3a.option.BucketsPath, bucket)
+ err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix)
+ if err != nil {
+ return nil, err
}
- // Sort by key, then by LastModified and VersionId
+ // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
sort.Slice(allVersions, func(i, j int) bool {
var keyI, keyJ string
var lastModifiedI, lastModifiedJ time.Time
@@ -202,13 +168,20 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
versionIdJ = v.VersionId
}
+ // First sort by object key
if keyI != keyJ {
return keyI < keyJ
}
- if !lastModifiedI.Equal(lastModifiedJ) {
+
+ // Then by modification time (newest first) - but use nanosecond precision for ties
+ timeDiff := lastModifiedI.Sub(lastModifiedJ)
+ if timeDiff.Abs() > time.Millisecond {
return lastModifiedI.After(lastModifiedJ)
}
- return versionIdI < versionIdJ
+
+ // For very close timestamps (within 1ms), use version ID for deterministic ordering
+ // Sort version IDs in reverse lexicographic order to maintain newest-first semantics
+ return versionIdI > versionIdJ
})
// Build result
@@ -237,6 +210,10 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
}
}
+ // Always initialize empty slices so boto3 gets the expected fields even when empty
+ result.Versions = make([]VersionEntry, 0)
+ result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
+
// Add versions to result
for _, version := range allVersions {
switch v := version.(type) {
@@ -250,6 +227,128 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
return result, nil
}
+// findVersionsRecursively searches for all .versions directories and regular files recursively
+func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error {
+ // List entries in current directory
+ entries, _, err := s3a.list(currentPath, "", "", false, 1000)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range entries {
+ entryPath := path.Join(relativePath, entry.Name)
+
+ // Skip if this doesn't match the prefix filter
+ if prefix != "" && !strings.HasPrefix(entryPath, strings.TrimPrefix(prefix, "/")) {
+ continue
+ }
+
+ if entry.IsDirectory {
+ // Skip .uploads directory (multipart upload temporary files)
+ if strings.HasPrefix(entry.Name, ".uploads") {
+ continue
+ }
+
+ // Check if this is a .versions directory
+ if strings.HasSuffix(entry.Name, ".versions") {
+ // Extract object name from .versions directory name
+ objectKey := strings.TrimSuffix(entryPath, ".versions")
+ processedObjects[objectKey] = true
+
+ glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey)
+
+ versions, err := s3a.getObjectVersionList(bucket, objectKey)
+ if err != nil {
+ glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
+ continue
+ }
+
+ for _, version := range versions {
+ // Check for duplicate version IDs and skip if already seen
+ versionKey := objectKey + ":" + version.VersionId
+ if seenVersionIds[versionKey] {
+ glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey)
+ continue
+ }
+ seenVersionIds[versionKey] = true
+
+ if version.IsDeleteMarker {
+ deleteMarker := &DeleteMarkerEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ }
+ *allVersions = append(*allVersions, deleteMarker)
+ } else {
+ versionEntry := &VersionEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ ETag: version.ETag,
+ Size: version.Size,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
+ }
+ }
+ } else {
+ // Recursively search subdirectories
+ fullPath := path.Join(currentPath, entry.Name)
+ err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
+ if err != nil {
+ glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
+ continue
+ }
+ }
+ } else {
+ // This is a regular file - check if it's a pre-versioning object
+ objectKey := entryPath
+
+ // Skip if this object already has a .versions directory (already processed)
+ if processedObjects[objectKey] {
+ continue
+ }
+
+ // This is a pre-versioning object - treat it as a version with VersionId="null"
+ glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey)
+
+ // Check if this null version should be marked as latest
+ // It's only latest if there's no .versions directory OR no latest version metadata
+ isLatest := true
+ versionsObjectPath := objectKey + ".versions"
+ if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil {
+ // .versions directory exists, check if there's latest version metadata
+ if versionsEntry.Extended != nil {
+ if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
+ // There is a latest version in the .versions directory, so null is not latest
+ isLatest = false
+ glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey)
+ }
+ }
+ }
+
+ etag := s3a.calculateETagFromChunks(entry.Chunks)
+ versionEntry := &VersionEntry{
+ Key: objectKey,
+ VersionId: "null",
+ IsLatest: isLatest,
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ ETag: etag,
+ Size: int64(entry.Attributes.FileSize),
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
+ }
+ }
+
+ return nil
+}
+
// getObjectVersionList returns all versions of a specific object
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
var versions []*ObjectVersion
@@ -287,6 +386,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
+ // Use a map to detect and prevent duplicate version IDs
+ seenVersionIds := make(map[string]bool)
+
for i, entry := range entries {
if entry.Extended == nil {
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
@@ -301,6 +403,13 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versionId := string(versionIdBytes)
+ // Check for duplicate version IDs and skip if already seen
+ if seenVersionIds[versionId] {
+ glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
+ continue
+ }
+ seenVersionIds[versionId] = true
+
// Check if this version is the latest by comparing with directory metadata
isLatest := (versionId == latestVersionId)
@@ -331,12 +440,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versions = append(versions, version)
}
- // Sort by modification time (newest first)
- sort.Slice(versions, func(i, j int) bool {
- return versions[i].LastModified.After(versions[j].LastModified)
- })
+ // Don't sort here - let the main listObjectVersions function handle sorting consistently
- glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object)
+ glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries))
for i, version := range versions {
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
}
@@ -366,6 +472,16 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
}
+ if versionId == "null" {
+ // "null" version ID refers to pre-versioning objects stored as regular files
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ entry, err := s3a.getEntry(bucketDir, object)
+ if err != nil {
+ return nil, fmt.Errorf("null version object %s not found: %v", object, err)
+ }
+ return entry, nil
+ }
+
// Get specific version from .versions directory
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -384,6 +500,32 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version ID is required for version-specific deletion")
}
+ if versionId == "null" {
+ // Delete "null" version (pre-versioning object stored as regular file)
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ cleanObject := strings.TrimPrefix(object, "/")
+
+ // Check if the object exists
+ _, err := s3a.getEntry(bucketDir, cleanObject)
+ if err != nil {
+ // Object doesn't exist - this is OK for delete operations (idempotent)
+ glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", cleanObject)
+ return nil
+ }
+
+ // Delete the regular file
+ deleteErr := s3a.rm(bucketDir, cleanObject, true, false)
+ if deleteErr != nil {
+ // Check if file was already deleted by another process
+ if _, checkErr := s3a.getEntry(bucketDir, cleanObject); checkErr != nil {
+ // File doesn't exist anymore, deletion was successful
+ return nil
+ }
+ return fmt.Errorf("failed to delete null version %s: %v", cleanObject, deleteErr)
+ }
+ return nil
+ }
+
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -393,16 +535,120 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version %s not found: %v", versionId, err)
}
- // Version exists, delete it
+ // Check if this is the latest version before deleting
+ versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
+ isLatestVersion := false
+ if dirErr == nil && versionsEntry.Extended != nil {
+ if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
+ isLatestVersion = (string(latestVersionIdBytes) == versionId)
+ }
+ }
+
+ // Delete the version file
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
if deleteErr != nil {
// Check if file was already deleted by another process
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
// File doesn't exist anymore, deletion was successful
- return nil
+ } else {
+ return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
- return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
+
+ // If we deleted the latest version, update the .versions directory metadata to point to the new latest
+ if isLatestVersion {
+ err := s3a.updateLatestVersionAfterDeletion(bucket, object)
+ if err != nil {
+ glog.Warningf("deleteSpecificObjectVersion: failed to update latest version after deletion: %v", err)
+ // Don't return error since the deletion was successful
+ }
+ }
+
+ return nil
+}
+
+// updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest
+func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ cleanObject := strings.TrimPrefix(object, "/")
+ versionsObjectPath := cleanObject + ".versions"
+ versionsDir := bucketDir + "/" + versionsObjectPath
+
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
+
+ // List all remaining version files in the .versions directory
+ entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
+ if err != nil {
+ glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
+ return fmt.Errorf("failed to list versions: %v", err)
+ }
+
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir)
+
+ // Find the most recent remaining version (latest timestamp in version ID)
+ var latestVersionId string
+ var latestVersionFileName string
+
+ for _, entry := range entries {
+ if entry.Extended == nil {
+ continue
+ }
+
+ versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
+ if !hasVersionId {
+ continue
+ }
+
+ versionId := string(versionIdBytes)
+
+ // Skip delete markers when finding latest content version
+ isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
+ if string(isDeleteMarkerBytes) == "true" {
+ continue
+ }
+
+ // Compare version IDs chronologically (our version IDs start with timestamp)
+ if latestVersionId == "" || versionId > latestVersionId {
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
+ latestVersionId = versionId
+ latestVersionFileName = entry.Name
+ } else {
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId)
+ }
+ }
+
+ // Update the .versions directory metadata
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ return fmt.Errorf("failed to get .versions directory: %v", err)
+ }
+
+ if versionsEntry.Extended == nil {
+ versionsEntry.Extended = make(map[string][]byte)
+ }
+
+ if latestVersionId != "" {
+ // Update metadata to point to new latest version
+ versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
+ versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
+ glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId)
+ } else {
+ // No versions left, remove latest version metadata
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
+ glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s", bucket, object)
+ }
+
+ // Update the .versions directory entry
+ err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionsEntry.Extended
+ updatedEntry.Attributes = versionsEntry.Attributes
+ updatedEntry.Chunks = versionsEntry.Chunks
+ })
+ if err != nil {
+ return fmt.Errorf("failed to update .versions directory metadata: %v", err)
+ }
+
return nil
}
@@ -450,24 +696,56 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionsObjectPath := object + ".versions"
+ cleanObject := strings.TrimPrefix(object, "/")
+ versionsObjectPath := cleanObject + ".versions"
// Get the .versions directory entry to read latest version metadata
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
- return nil, fmt.Errorf("failed to get .versions directory: %w", err)
+ // .versions directory doesn't exist - this can happen for objects that existed
+ // before versioning was enabled on the bucket. Fall back to checking for a
+ // regular (non-versioned) object file.
+ glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, cleanObject, err)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, cleanObject)
+ return regularEntry, nil
}
// Check if directory has latest version metadata
if versionsEntry.Extended == nil {
- return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object)
+ // No metadata means all versioned objects have been deleted.
+ // Fall back to checking for a pre-versioning object.
+ glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, cleanObject)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, cleanObject)
+ return regularEntry, nil
}
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
if !hasLatestVersionId || !hasLatestVersionFile {
- return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object)
+ // No version metadata means all versioned objects have been deleted.
+ // Fall back to checking for a pre-versioning object.
+ glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, cleanObject)
+ return regularEntry, nil
}
latestVersionId := string(latestVersionIdBytes)