diff options
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
| -rw-r--r-- | weed/s3api/s3api_object_versioning.go | 215 |
1 files changed, 165 insertions, 50 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index e9802d71c..4f1ff901f 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -151,6 +151,8 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*S3ListObjectVersionsResult, error) { var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry + glog.V(1).Infof("listObjectVersions: listing versions for bucket %s, prefix '%s'", bucket, prefix) + // Track objects that have been processed to avoid duplicates processedObjects := make(map[string]bool) @@ -161,9 +163,12 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM bucketPath := path.Join(s3a.option.BucketsPath, bucket) err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix) if err != nil { + glog.Errorf("listObjectVersions: findVersionsRecursively failed: %v", err) return nil, err } + glog.V(1).Infof("listObjectVersions: found %d total versions", len(allVersions)) + // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering sort.Slice(allVersions, func(i, j int) bool { var keyI, keyJ string @@ -218,6 +223,8 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM IsTruncated: len(allVersions) > maxKeys, } + glog.V(1).Infof("listObjectVersions: building response with %d versions (truncated: %v)", len(allVersions), result.IsTruncated) + // Limit results if len(allVersions) > maxKeys { allVersions = allVersions[:maxKeys] @@ -239,15 +246,19 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM result.DeleteMarkers = make([]DeleteMarkerEntry, 0) // Add versions to result - for _, version := range allVersions { + for i, version := range allVersions { switch v := version.(type) { case *VersionEntry: + glog.V(2).Infof("listObjectVersions: adding version %d: key=%s, versionId=%s", i, v.Key, v.VersionId) result.Versions = append(result.Versions, *v) case *DeleteMarkerEntry: + glog.V(2).Infof("listObjectVersions: adding delete marker %d: key=%s, versionId=%s", i, v.Key, v.VersionId) result.DeleteMarkers = append(result.DeleteMarkers, *v) } } + glog.V(1).Infof("listObjectVersions: final result - %d versions, %d delete markers", len(result.Versions), len(result.DeleteMarkers)) + return result, nil } @@ -293,43 +304,51 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string if strings.HasSuffix(entry.Name, ".versions") { // Extract object name from .versions directory name objectKey := strings.TrimSuffix(entryPath, ".versions") + normalizedObjectKey := removeDuplicateSlashes(objectKey) + // Mark both keys as processed for backward compatibility processedObjects[objectKey] = true + processedObjects[normalizedObjectKey] = true - glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey) + glog.V(2).Infof("Found .versions directory for object %s (normalized: %s)", objectKey, normalizedObjectKey) - versions, err := s3a.getObjectVersionList(bucket, objectKey) + versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey) if err != nil { - glog.Warningf("Failed to get versions for object %s: %v", objectKey, err) + glog.Warningf("Failed to get versions for object %s (normalized: %s): %v", objectKey, normalizedObjectKey, err) continue } for _, version := range versions { // Check for duplicate version IDs and skip if already seen - versionKey := objectKey + ":" + version.VersionId + // Use normalized key for deduplication + versionKey := normalizedObjectKey + ":" + version.VersionId if seenVersionIds[versionKey] { - glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey) + glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, normalizedObjectKey) continue } seenVersionIds[versionKey] = true if version.IsDeleteMarker { + glog.V(0).Infof("Adding delete marker from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s", + normalizedObjectKey, version.VersionId, version.IsLatest, versionKey) deleteMarker := &DeleteMarkerEntry{ - Key: objectKey, + Key: normalizedObjectKey, // Use normalized key for consistency VersionId: version.VersionId, IsLatest: version.IsLatest, LastModified: version.LastModified, - Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey), + Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey), } *allVersions = append(*allVersions, deleteMarker) } else { + glog.V(0).Infof("Adding version from .versions: objectKey=%s, versionId=%s, isLatest=%v, versionKey=%s", + normalizedObjectKey, version.VersionId, version.IsLatest, versionKey) versionEntry := &VersionEntry{ - Key: objectKey, + Key: normalizedObjectKey, // Use normalized key for consistency VersionId: version.VersionId, IsLatest: version.IsLatest, LastModified: version.LastModified, ETag: version.ETag, Size: version.Size, - Owner: s3a.getObjectOwnerFromVersion(version, bucket, objectKey), + Owner: s3a.getObjectOwnerFromVersion(version, bucket, normalizedObjectKey), StorageClass: "STANDARD", } *allVersions = append(*allVersions, versionEntry) @@ -376,32 +395,85 @@ func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string // This is a regular file - check if it's a pre-versioning object objectKey := entryPath + // Normalize object key to ensure consistency with other version operations + normalizedObjectKey := removeDuplicateSlashes(objectKey) + // Skip if this object already has a .versions directory (already processed) - if processedObjects[objectKey] { + // Check both normalized and original keys for backward compatibility + if processedObjects[objectKey] || processedObjects[normalizedObjectKey] { + glog.V(0).Infof("Skipping already processed object: objectKey=%s, normalizedObjectKey=%s, processedObjects[objectKey]=%v, processedObjects[normalizedObjectKey]=%v", + objectKey, normalizedObjectKey, processedObjects[objectKey], processedObjects[normalizedObjectKey]) continue } - // This is a pre-versioning object - treat it as a version with VersionId="null" - glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey) + glog.V(0).Infof("Processing regular file: objectKey=%s, normalizedObjectKey=%s, NOT in processedObjects", objectKey, normalizedObjectKey) - // Check if this null version should be marked as latest - // It's only latest if there's no .versions directory OR no latest version metadata - isLatest := true - versionsObjectPath := objectKey + ".versions" - if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil { - // .versions directory exists, check if there's latest version metadata - if versionsEntry.Extended != nil { - if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { - // There is a latest version in the .versions directory, so null is not latest - isLatest = false - glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey) + // This is a pre-versioning or suspended-versioning object + // Check if this file has version metadata (ExtVersionIdKey) + hasVersionMeta := false + if entry.Extended != nil { + if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { + hasVersionMeta = true + glog.V(0).Infof("Regular file %s has version metadata: %s", normalizedObjectKey, string(versionIdBytes)) + } + } + + // Check if a .versions directory exists for this object + versionsObjectPath := normalizedObjectKey + ".versions" + _, versionsErr := s3a.getEntry(currentPath, versionsObjectPath) + if versionsErr == nil { + // .versions directory exists + glog.V(0).Infof("Found .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta) + + // If this file has version metadata, it's a suspended versioning null version + // Include it and it will be the latest + if hasVersionMeta { + glog.V(0).Infof("Including suspended versioning file %s (has version metadata)", normalizedObjectKey) + // Continue to add it below + } else { + // No version metadata - this is a pre-versioning file + // Skip it if there's already a null version in .versions + versions, err := s3a.getObjectVersionList(bucket, normalizedObjectKey) + if err == nil { + hasNullVersion := false + for _, v := range versions { + if v.VersionId == "null" { + hasNullVersion = true + break + } + } + if hasNullVersion { + glog.V(0).Infof("Skipping pre-versioning file %s, null version exists in .versions", normalizedObjectKey) + processedObjects[objectKey] = true + processedObjects[normalizedObjectKey] = true + continue + } } + glog.V(0).Infof("Including pre-versioning file %s (no null version in .versions)", normalizedObjectKey) } + } else { + glog.V(0).Infof("No .versions directory for regular file %s, hasVersionMeta=%v", normalizedObjectKey, hasVersionMeta) + } + + // Add this file as a null version with IsLatest=true + isLatest := true + + // Check for duplicate version IDs and skip if already seen + // Use normalized key for deduplication to match how other version operations work + versionKey := normalizedObjectKey + ":null" + if seenVersionIds[versionKey] { + glog.Warningf("findVersionsRecursively: duplicate null version for object %s detected (versionKey=%s), skipping", normalizedObjectKey, versionKey) + continue } + seenVersionIds[versionKey] = true etag := s3a.calculateETagFromChunks(entry.Chunks) + + glog.V(0).Infof("Adding null version from regular file: objectKey=%s, normalizedObjectKey=%s, versionKey=%s, isLatest=%v, hasVersionMeta=%v", + objectKey, normalizedObjectKey, versionKey, isLatest, hasVersionMeta) + versionEntry := &VersionEntry{ - Key: objectKey, + Key: normalizedObjectKey, // Use normalized key for consistency VersionId: "null", IsLatest: isLatest, LastModified: time.Unix(entry.Attributes.Mtime, 0), @@ -535,23 +607,26 @@ func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) st // getSpecificObjectVersion retrieves a specific version of an object func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) { + // Normalize object path to ensure consistency with toFilerUrl behavior + normalizedObject := removeDuplicateSlashes(object) + if versionId == "" { // Get current version - return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/")) + return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(normalizedObject, "/")) } if versionId == "null" { // "null" version ID refers to pre-versioning objects stored as regular files bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err := s3a.getEntry(bucketDir, object) + entry, err := s3a.getEntry(bucketDir, normalizedObject) if err != nil { - return nil, fmt.Errorf("null version object %s not found: %v", object, err) + return nil, fmt.Errorf("null version object %s not found: %v", normalizedObject, err) } return entry, nil } // Get specific version from .versions directory - versionsDir := s3a.getVersionedObjectDir(bucket, object) + versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject) versionFile := s3a.getVersionFileName(versionId) entry, err := s3a.getEntry(versionsDir, versionFile) @@ -564,6 +639,9 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin // deleteSpecificObjectVersion deletes a specific version of an object func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error { + // Normalize object path to ensure consistency with toFilerUrl behavior + normalizedObject := removeDuplicateSlashes(object) + if versionId == "" { return fmt.Errorf("version ID is required for version-specific deletion") } @@ -571,7 +649,7 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st if versionId == "null" { // Delete "null" version (pre-versioning object stored as regular file) bucketDir := s3a.option.BucketsPath + "/" + bucket - cleanObject := strings.TrimPrefix(object, "/") + cleanObject := strings.TrimPrefix(normalizedObject, "/") // Check if the object exists _, err := s3a.getEntry(bucketDir, cleanObject) @@ -594,11 +672,11 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st return nil } - versionsDir := s3a.getVersionedObjectDir(bucket, object) + versionsDir := s3a.getVersionedObjectDir(bucket, normalizedObject) versionFile := s3a.getVersionFileName(versionId) // Check if this is the latest version before attempting deletion (for potential metadata update) - versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions") + versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), normalizedObject+".versions") isLatestVersion := false if dirErr == nil && versionsEntry.Extended != nil { if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { @@ -765,39 +843,76 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http // getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) { + // Normalize object path to ensure consistency with toFilerUrl behavior + normalizedObject := removeDuplicateSlashes(object) + bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := object + ".versions" + versionsObjectPath := normalizedObject + ".versions" + + glog.V(1).Infof("getLatestObjectVersion: looking for latest version of %s/%s (normalized: %s)", bucket, object, normalizedObject) + + // Get the .versions directory entry to read latest version metadata with retry logic for filer consistency + var versionsEntry *filer_pb.Entry + var err error + maxRetries := 8 + for attempt := 1; attempt <= maxRetries; attempt++ { + versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath) + if err == nil { + break + } + + if attempt < maxRetries { + // Exponential backoff with higher base: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 6400ms + delay := time.Millisecond * time.Duration(100*(1<<(attempt-1))) + time.Sleep(delay) + } + } - // Get the .versions directory entry to read latest version metadata - versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) if err != nil { // .versions directory doesn't exist - this can happen for objects that existed // before versioning was enabled on the bucket. Fall back to checking for a // regular (non-versioned) object file. - glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s%s, checking for pre-versioning object", bucket, object) + glog.V(1).Infof("getLatestObjectVersion: no .versions directory for %s%s after %d attempts (error: %v), checking for pre-versioning object", bucket, normalizedObject, maxRetries, err) - regularEntry, regularErr := s3a.getEntry(bucketDir, object) + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) if regularErr != nil { - return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, object, err) + glog.V(1).Infof("getLatestObjectVersion: no pre-versioning object found for %s%s (error: %v)", bucket, normalizedObject, regularErr) + return nil, fmt.Errorf("failed to get %s%s .versions directory and no regular object found: %w", bucket, normalizedObject, err) } - glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, object) + glog.V(1).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, normalizedObject) return regularEntry, nil } - // Check if directory has latest version metadata + // Check if directory has latest version metadata - retry if missing due to race condition if versionsEntry.Extended == nil { - // No metadata means all versioned objects have been deleted. - // Fall back to checking for a pre-versioning object. - glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s, checking for pre-versioning object", bucket, object) + // Retry a few times to handle the race condition where directory exists but metadata is not yet written + metadataRetries := 3 + for metaAttempt := 1; metaAttempt <= metadataRetries; metaAttempt++ { + // Small delay and re-read the directory + time.Sleep(time.Millisecond * 100) + versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + break + } - regularEntry, regularErr := s3a.getEntry(bucketDir, object) - if regularErr != nil { - return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object) + if versionsEntry.Extended != nil { + break + } } - glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object) - return regularEntry, nil + // If still no metadata after retries, fall back to pre-versioning object + if versionsEntry.Extended == nil { + glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s%s after retries, checking for pre-versioning object", bucket, object) + + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + if regularErr != nil { + return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, normalizedObject) + } + + glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s (no Extended metadata case)", bucket, object) + return regularEntry, nil + } } latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] @@ -808,9 +923,9 @@ func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb // Fall back to checking for a pre-versioning object. glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object) - regularEntry, regularErr := s3a.getEntry(bucketDir, object) + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) if regularErr != nil { - return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, object) + return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s%s", bucket, normalizedObject) } glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s%s after version deletion", bucket, object) |
