aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_versioning.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
-rw-r--r--weed/s3api/s3api_object_versioning.go215
1 files changed, 165 insertions, 50 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
index e9802d71c..4f1ff901f 100644
--- a/weed/s3api/s3api_object_versioning.go
+++ b/weed/s3api/s3api_object_versioning.go
@@ -151,6 +151,8 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) {
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
+ glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s'", bucket, prefix)
+
// Track objects that have been processed to avoid duplicates
processedObjects := make(map[string]bool)
@@ -161,9 +163,12 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
bucketPath := path.Join(s3a.option.BucketsPath, bucket)
err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix)
if err != nil {
+ glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err)
return nil, err
}
+ glog.V(1).Infof("listObjectVersions: found %d total versions", len(allVersions))
+
// Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
sort.Slice(allVersions, func(i, j int) bool {
var keyI, keyJ string
@@ -218,6 +223,8 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
IsTruncated: len(allVersions) > maxKeys,
}
+ glog.V(1).Infof("listObjectVersions: building response with %d versions (truncated: %v)", len(allVersions), result.IsTruncated)
+
// Limit results
if len(allVersions) > maxKeys {
allVersions = allVersions[:maxKeys]
@@ -239,15 +246,19 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
// Add versions to result
- for _, version := range allVersions {
+ for i, version := range allVersions {
switch v := version.(type) {
case *VersionEntry:
+ glog.V(2).Infof("listObjectVersions: adding version %d: key=%s, versionId=%s", i, v.Key, v.VersionId)
result.Versions = append(result.Versions, *v)
case *DeleteMarkerEntry:
+ glog.V(2).Infof("listObjectVersions: adding delete marker %d: key=%s, versionId=%s", i, v.Key, v.VersionId)
result.DeleteMarkers = append(result.DeleteMarkers, *v)
}
}
+ glog.V(1).Infof("listObjectVersions: final result - %d versions, %d delete markers", len(result.Versions), len(result.DeleteMarkers))
+
return result, nil
}
@@ -293,43 +304,51 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
if strings.HasSuffix(entry.Name, ".versions") {
// Extract object name from .versions directory name
objectKey := strings.TrimSuffix(entryPath, ".versions")
+ normalizedObjectKey := removeDuplicateSlashes(objectKey)
+ // Mark both keys as processed for backward compatibility
processedObjects[objectKey] = true
+ processedObjects[normalizedObjectKey] = true
- glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey)
+ glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey)
- versions, err := s3a.getObjectVersionList(bucket, objectKey)
+ versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
if err != nil {
- glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
+ glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err)
continue
}
for _, version := range versions {
// Check for duplicate version IDs and skip if already seen
- versionKey := objectKey + ":" + version.VersionId
+ // Use normalized key for deduplication
+ versionKey := normalizedObjectKey + ":" + version.VersionId
if seenVersionIds[versionKey] {
- glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey)
+ glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey)
continue
}
seenVersionIds[versionKey] = true
if version.IsDeleteMarker {
+ glog.V(0).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
+ normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
deleteMarker := &DeleteMarkerEntry{
- Key: objectKey,
+ Key: normalizedObjectKey, // Use normalized key for consistency
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
- Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
+ Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
}
*allVersions = append(*allVersions, deleteMarker)
} else {
+ glog.V(0).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s",
+ normalizedObjectKey, version.VersionId, version.IsLatest, versionKey)
versionEntry := &VersionEntry{
- Key: objectKey,
+ Key: normalizedObjectKey, // Use normalized key for consistency
VersionId: version.VersionId,
IsLatest: version.IsLatest,
LastModified: version.LastModified,
ETag: version.ETag,
Size: version.Size,
- Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey),
+ Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey),
StorageClass: "STANDARD",
}
*allVersions = append(*allVersions, versionEntry)
@@ -376,32 +395,85 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string
// This is a regular file - check if it's a pre-versioning object
objectKey := entryPath
+ // Normalize object key to ensure consistency with other version operations
+ normalizedObjectKey := removeDuplicateSlashes(objectKey)
+
// Skip if this object already has a .versions directory (already processed)
- if processedObjects[objectKey] {
+ // Check both normalized and original keys for backward compatibility
+ if processedObjects[objectKey] || processedObjects[normalizedObjectKey] {
+ glog.V(0).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v",
+ objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey])
continue
}
- // This is a pre-versioning object - treat it as a version with VersionId="null"
- glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey)
+ glog.V(0).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey)
- // Check if this null version should be marked as latest
- // It's only latest if there's no .versions directory OR no latest version metadata
- isLatest := true
- versionsObjectPath := objectKey + ".versions"
- if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil {
- // .versions directory exists, check if there's latest version metadata
- if versionsEntry.Extended != nil {
- if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
- // There is a latest version in the .versions directory, so null is not latest
- isLatest = false
- glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey)
+ // This is a pre-versioning or suspended-versioning object
+ // Check if this file has version metadata (ExtVersionIdKey)
+ hasVersionMeta := false
+ if entry.Extended != nil {
+ if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
+ hasVersionMeta = true
+ glog.V(0).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes))
+ }
+ }
+
+ // Check if a .versions directory exists for this object
+ versionsObjectPath := normalizedObjectKey + ".versions"
+ _, versionsErr := s3a.getEntry(currentPath, versionsObjectPath)
+ if versionsErr == nil {
+ // .versions directory exists
+ glog.V(0).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
+
+ // If this file has version metadata, it's a suspended versioning null version
+ // Include it and it will be the latest
+ if hasVersionMeta {
+ glog.V(0).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey)
+ // Continue to add it below
+ } else {
+ // No version metadata - this is a pre-versioning file
+ // Skip it if there's already a null version in .versions
+ versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey)
+ if err == nil {
+ hasNullVersion := false
+ for _, v := range versions {
+ if v.VersionId == "null" {
+ hasNullVersion = true
+ break
+ }
+ }
+ if hasNullVersion {
+ glog.V(0).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey)
+ processedObjects[objectKey] = true
+ processedObjects[normalizedObjectKey] = true
+ continue
+ }
}
+ glog.V(0).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey)
}
+ } else {
+ glog.V(0).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta)
+ }
+
+ // Add this file as a null version with IsLatest=true
+ isLatest := true
+
+ // Check for duplicate version IDs and skip if already seen
+ // Use normalized key for deduplication to match how other version operations work
+ versionKey := normalizedObjectKey + ":null"
+ if seenVersionIds[versionKey] {
+ glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey)
+ continue
}
+ seenVersionIds[versionKey] = true
etag := s3a.calculateETagFromChunks(entry.Chunks)
+
+ glog.V(0).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v",
+ objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta)
+
versionEntry := &VersionEntry{
- Key: objectKey,
+ Key: normalizedObjectKey, // Use normalized key for consistency
VersionId: "null",
IsLatest: isLatest,
LastModified: time.Unix(entry.Attributes.Mtime, 0),
@@ -535,23 +607,26 @@ func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) st
// getSpecificObjectVersion retrieves a specific version of an object
func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
+ // Normalize object path to ensure consistency with toFilerUrl behavior
+ normalizedObject := removeDuplicateSlashes(object)
+
if versionId == "" {
// Get current version
- return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
+ return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(normalizedObject, "/"))
}
if versionId == "null" {
// "null" version ID refers to pre-versioning objects stored as regular files
bucketDir := s3a.option.BucketsPath + "/" + bucket
- entry, err := s3a.getEntry(bucketDir, object)
+ entry, err := s3a.getEntry(bucketDir, normalizedObject)
if err != nil {
- return nil, fmt.Errorf("null version object %s not found: %v", object, err)
+ return nil, fmt.Errorf("null version object %s not found: %v", normalizedObject, err)
}
return entry, nil
}
// Get specific version from .versions directory
- versionsDir := s3a.getVersionedObjectDir(bucket, object)
+ versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject)
versionFile := s3a.getVersionFileName(versionId)
entry, err := s3a.getEntry(versionsDir, versionFile)
@@ -564,6 +639,9 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
// deleteSpecificObjectVersion deletes a specific version of an object
func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
+ // Normalize object path to ensure consistency with toFilerUrl behavior
+ normalizedObject := removeDuplicateSlashes(object)
+
if versionId == "" {
return fmt.Errorf("version ID is required for version-specific deletion")
}
@@ -571,7 +649,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
if versionId == "null" {
// Delete "null" version (pre-versioning object stored as regular file)
bucketDir := s3a.option.BucketsPath + "/" + bucket
- cleanObject := strings.TrimPrefix(object, "/")
+ cleanObject := strings.TrimPrefix(normalizedObject, "/")
// Check if the object exists
_, err := s3a.getEntry(bucketDir, cleanObject)
@@ -594,11 +672,11 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return nil
}
- versionsDir := s3a.getVersionedObjectDir(bucket, object)
+ versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject)
versionFile := s3a.getVersionFileName(versionId)
// Check if this is the latest version before attempting deletion (for potential metadata update)
- versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
+ versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+".versions")
isLatestVersion := false
if dirErr == nil && versionsEntry.Extended != nil {
if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
@@ -765,39 +843,76 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
+ // Normalize object path to ensure consistency with toFilerUrl behavior
+ normalizedObject := removeDuplicateSlashes(object)
+
bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionsObjectPath := object + ".versions"
+ versionsObjectPath := normalizedObject + ".versions"
+
+ glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject)
+
+ // Get the .versions directory entry to read latest version metadata with retry logic for filer consistency
+ var versionsEntry *filer_pb.Entry
+ var err error
+ maxRetries := 8
+ for attempt := 1; attempt <= maxRetries; attempt++ {
+ versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
+ if err == nil {
+ break
+ }
+
+ if attempt < maxRetries {
+ // Exponential backoff with higher base: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 6400ms
+ delay := time.Millisecond * time.Duration(100*(1<<(attempt-1)))
+ time.Sleep(delay)
+ }
+ }
- // Get the .versions directory entry to read latest version metadata
- versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
// .versions directory doesn't exist - this can happen for objects that existed
// before versioning was enabled on the bucket. Fall back to checking for a
// regular (non-versioned) object file.
- glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s%s, checking for pre-versioning object", bucket, object)
+ glog.V(1).Infof("getLatestObjectVersion: no .versions directory for %s%s after %d attempts (error: %v), checking for pre-versioning object", bucket, normalizedObject, maxRetries, err)
- regularEntry, regularErr := s3a.getEntry(bucketDir, object)
+ regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
if regularErr != nil {
- return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, object, err)
+ glog.V(1).Infof("getLatestObjectVersion: no pre-versioning object found for %s%s (error: %v)", bucket, normalizedObject, regularErr)
+ return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, normalizedObject, err)
}
- glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, object)
+ glog.V(1).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, normalizedObject)
return regularEntry, nil
}
- // Check if directory has latest version metadata
+ // Check if directory has latest version metadata - retry if missing due to race condition
if versionsEntry.Extended == nil {
- // No metadata means all versioned objects have been deleted.
- // Fall back to checking for a pre-versioning object.
- glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s, checking for pre-versioning object", bucket, object)
+ // Retry a few times to handle the race condition where directory exists but metadata is not yet written
+ metadataRetries := 3
+ for metaAttempt := 1; metaAttempt <= metadataRetries; metaAttempt++ {
+ // Small delay and re-read the directory
+ time.Sleep(time.Millisecond * 100)
+ versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ break
+ }
- regularEntry, regularErr := s3a.getEntry(bucketDir, object)
- if regularErr != nil {
- return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
+ if versionsEntry.Extended != nil {
+ break
+ }
}
- glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object)
- return regularEntry, nil
+ // If still no metadata after retries, fall back to pre-versioning object
+ if versionsEntry.Extended == nil {
+ glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s after retries, checking for pre-versioning object", bucket, object)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, normalizedObject)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object)
+ return regularEntry, nil
+ }
}
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
@@ -808,9 +923,9 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb
// Fall back to checking for a pre-versioning object.
glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
- regularEntry, regularErr := s3a.getEntry(bucketDir, object)
+ regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
if regularErr != nil {
- return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object)
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, normalizedObject)
}
glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s after version deletion", bucket, object)