diff options
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
| -rw-r--r-- | weed/s3api/s3api_object_versioning.go | 422 |
1 files changed, 350 insertions, 72 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index cfb3d597c..d1893cb85 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -2,7 +2,6 @@ package s3api import ( "crypto/rand" - "crypto/sha256" "encoding/hex" "encoding/xml" "fmt" @@ -48,20 +47,26 @@ type ListObjectVersionsResult struct { CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` } -// generateVersionId creates a unique version ID +// generateVersionId creates a unique version ID that preserves chronological order func generateVersionId() string { - // Generate a random 16-byte value - randBytes := make([]byte, 16) + // Use nanosecond timestamp to ensure chronological ordering + // Format as 16-digit hex (first 16 chars of version ID) + now := time.Now().UnixNano() + timestampHex := fmt.Sprintf("%016x", now) + + // Generate random 8 bytes for uniqueness (last 16 chars of version ID) + randBytes := make([]byte, 8) if _, err := rand.Read(randBytes); err != nil { glog.Errorf("Failed to generate random bytes for version ID: %v", err) - return "" + // Fallback to timestamp-only if random generation fails + return timestampHex + "0000000000000000" } - // Hash with current timestamp for uniqueness - hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...)) + // Combine timestamp (16 chars) + random (16 chars) = 32 chars total + randomHex := hex.EncodeToString(randBytes) + versionId := timestampHex + randomHex - // Return first 32 characters of hex string (same length as AWS S3 version IDs) - return hex.EncodeToString(hash[:])[:32] + return versionId } // getVersionedObjectDir returns the directory path for storing object versions @@ -122,59 +127,20 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) { var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry - // List all entries in bucket - entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2)) - if err != nil { - return nil, err - } - - // For each entry, check if it's a .versions directory - for _, entry := range entries { - if !entry.IsDirectory { - continue - } - - // Check if this is a .versions directory - if !strings.HasSuffix(entry.Name, ".versions") { - continue - } + // Track objects that have been processed to avoid duplicates + processedObjects := make(map[string]bool) - // Extract object name from .versions directory name - objectKey := strings.TrimSuffix(entry.Name, ".versions") + // Track version IDs globally to prevent duplicates throughout the listing + seenVersionIds := make(map[string]bool) - versions, err := s3a.getObjectVersionList(bucket, objectKey) - if err != nil { - glog.Warningf("Failed to get versions for object %s: %v", objectKey, err) - continue - } - - for _, version := range versions { - if version.IsDeleteMarker { - deleteMarker := &DeleteMarkerEntry{ - Key: objectKey, - VersionId: version.VersionId, - IsLatest: version.IsLatest, - LastModified: version.LastModified, - Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, - } - allVersions = append(allVersions, deleteMarker) - } else { - versionEntry := &VersionEntry{ - Key: objectKey, - VersionId: version.VersionId, - IsLatest: version.IsLatest, - LastModified: version.LastModified, - ETag: version.ETag, - Size: version.Size, - Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, - StorageClass: "STANDARD", - } - allVersions = append(allVersions, versionEntry) - } - } + // Recursively find all .versions directories in the bucket + bucketPath := path.Join(s3a.option.BucketsPath, bucket) + err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix) + if err != nil { + return nil, err } - // Sort by key, then by LastModified and VersionId + // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering sort.Slice(allVersions, func(i, j int) bool { var keyI, keyJ string var lastModifiedI, lastModifiedJ time.Time @@ -202,13 +168,20 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM versionIdJ = v.VersionId } + // First sort by object key if keyI != keyJ { return keyI < keyJ } - if !lastModifiedI.Equal(lastModifiedJ) { + + // Then by modification time (newest first) - but use nanosecond precision for ties + timeDiff := lastModifiedI.Sub(lastModifiedJ) + if timeDiff.Abs() > time.Millisecond { return lastModifiedI.After(lastModifiedJ) } - return versionIdI < versionIdJ + + // For very close timestamps (within 1ms), use version ID for deterministic ordering + // Sort version IDs in reverse lexicographic order to maintain newest-first semantics + return versionIdI > versionIdJ }) // Build result @@ -237,6 +210,10 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM } } + // Always initialize empty slices so boto3 gets the expected fields even when empty + result.Versions = make([]VersionEntry, 0) + result.DeleteMarkers = make([]DeleteMarkerEntry, 0) + // Add versions to result for _, version := range allVersions { switch v := version.(type) { @@ -250,6 +227,128 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM return result, nil } +// findVersionsRecursively searches for all .versions directories and regular files recursively +func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error { + // List entries in current directory + entries, _, err := s3a.list(currentPath, "", "", false, 1000) + if err != nil { + return err + } + + for _, entry := range entries { + entryPath := path.Join(relativePath, entry.Name) + + // Skip if this doesn't match the prefix filter + if prefix != "" && !strings.HasPrefix(entryPath, strings.TrimPrefix(prefix, "/")) { + continue + } + + if entry.IsDirectory { + // Skip .uploads directory (multipart upload temporary files) + if strings.HasPrefix(entry.Name, ".uploads") { + continue + } + + // Check if this is a .versions directory + if strings.HasSuffix(entry.Name, ".versions") { + // Extract object name from .versions directory name + objectKey := strings.TrimSuffix(entryPath, ".versions") + processedObjects[objectKey] = true + + glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey) + + versions, err := s3a.getObjectVersionList(bucket, objectKey) + if err != nil { + glog.Warningf("Failed to get versions for object %s: %v", objectKey, err) + continue + } + + for _, version := range versions { + // Check for duplicate version IDs and skip if already seen + versionKey := objectKey + ":" + version.VersionId + if seenVersionIds[versionKey] { + glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey) + continue + } + seenVersionIds[versionKey] = true + + if version.IsDeleteMarker { + deleteMarker := &DeleteMarkerEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, + } + *allVersions = append(*allVersions, deleteMarker) + } else { + versionEntry := &VersionEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + ETag: version.ETag, + Size: version.Size, + Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, + StorageClass: "STANDARD", + } + *allVersions = append(*allVersions, versionEntry) + } + } + } else { + // Recursively search subdirectories + fullPath := path.Join(currentPath, entry.Name) + err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix) + if err != nil { + glog.Warningf("Error searching subdirectory %s: %v", entryPath, err) + continue + } + } + } else { + // This is a regular file - check if it's a pre-versioning object + objectKey := entryPath + + // Skip if this object already has a .versions directory (already processed) + if processedObjects[objectKey] { + continue + } + + // This is a pre-versioning object - treat it as a version with VersionId="null" + glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey) + + // Check if this null version should be marked as latest + // It's only latest if there's no .versions directory OR no latest version metadata + isLatest := true + versionsObjectPath := objectKey + ".versions" + if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil { + // .versions directory exists, check if there's latest version metadata + if versionsEntry.Extended != nil { + if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { + // There is a latest version in the .versions directory, so null is not latest + isLatest = false + glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey) + } + } + } + + etag := s3a.calculateETagFromChunks(entry.Chunks) + versionEntry := &VersionEntry{ + Key: objectKey, + VersionId: "null", + IsLatest: isLatest, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + ETag: etag, + Size: int64(entry.Attributes.FileSize), + Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, + StorageClass: "STANDARD", + } + *allVersions = append(*allVersions, versionEntry) + } + } + + return nil +} + // getObjectVersionList returns all versions of a specific object func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) { var versions []*ObjectVersion @@ -287,6 +386,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries)) + // Use a map to detect and prevent duplicate version IDs + seenVersionIds := make(map[string]bool) + for i, entry := range entries { if entry.Extended == nil { glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i) @@ -301,6 +403,13 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe versionId := string(versionIdBytes) + // Check for duplicate version IDs and skip if already seen + if seenVersionIds[versionId] { + glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object) + continue + } + seenVersionIds[versionId] = true + // Check if this version is the latest by comparing with directory metadata isLatest := (versionId == latestVersionId) @@ -331,12 +440,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe versions = append(versions, version) } - // Sort by modification time (newest first) - sort.Slice(versions, func(i, j int) bool { - return versions[i].LastModified.After(versions[j].LastModified) - }) + // Don't sort here - let the main listObjectVersions function handle sorting consistently - glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object) + glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries)) for i, version := range versions { glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker) } @@ -366,6 +472,16 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/")) } + if versionId == "null" { + // "null" version ID refers to pre-versioning objects stored as regular files + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err := s3a.getEntry(bucketDir, object) + if err != nil { + return nil, fmt.Errorf("null version object %s not found: %v", object, err) + } + return entry, nil + } + // Get specific version from .versions directory versionsDir := s3a.getVersionedObjectDir(bucket, object) versionFile := s3a.getVersionFileName(versionId) @@ -384,6 +500,32 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st return fmt.Errorf("version ID is required for version-specific deletion") } + if versionId == "null" { + // Delete "null" version (pre-versioning object stored as regular file) + bucketDir := s3a.option.BucketsPath + "/" + bucket + cleanObject := strings.TrimPrefix(object, "/") + + // Check if the object exists + _, err := s3a.getEntry(bucketDir, cleanObject) + if err != nil { + // Object doesn't exist - this is OK for delete operations (idempotent) + glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", cleanObject) + return nil + } + + // Delete the regular file + deleteErr := s3a.rm(bucketDir, cleanObject, true, false) + if deleteErr != nil { + // Check if file was already deleted by another process + if _, checkErr := s3a.getEntry(bucketDir, cleanObject); checkErr != nil { + // File doesn't exist anymore, deletion was successful + return nil + } + return fmt.Errorf("failed to delete null version %s: %v", cleanObject, deleteErr) + } + return nil + } + versionsDir := s3a.getVersionedObjectDir(bucket, object) versionFile := s3a.getVersionFileName(versionId) @@ -393,16 +535,120 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st return fmt.Errorf("version %s not found: %v", versionId, err) } - // Version exists, delete it + // Check if this is the latest version before deleting + versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions") + isLatestVersion := false + if dirErr == nil && versionsEntry.Extended != nil { + if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest { + isLatestVersion = (string(latestVersionIdBytes) == versionId) + } + } + + // Delete the version file deleteErr := s3a.rm(versionsDir, versionFile, true, false) if deleteErr != nil { // Check if file was already deleted by another process if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil { // File doesn't exist anymore, deletion was successful - return nil + } else { + return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr) } - return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr) } + + // If we deleted the latest version, update the .versions directory metadata to point to the new latest + if isLatestVersion { + err := s3a.updateLatestVersionAfterDeletion(bucket, object) + if err != nil { + glog.Warningf("deleteSpecificObjectVersion: failed to update latest version after deletion: %v", err) + // Don't return error since the deletion was successful + } + } + + return nil +} + +// updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest +func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error { + bucketDir := s3a.option.BucketsPath + "/" + bucket + cleanObject := strings.TrimPrefix(object, "/") + versionsObjectPath := cleanObject + ".versions" + versionsDir := bucketDir + "/" + versionsObjectPath + + glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir) + + // List all remaining version files in the .versions directory + entries, _, err := s3a.list(versionsDir, "", "", false, 1000) + if err != nil { + glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err) + return fmt.Errorf("failed to list versions: %v", err) + } + + glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir) + + // Find the most recent remaining version (latest timestamp in version ID) + var latestVersionId string + var latestVersionFileName string + + for _, entry := range entries { + if entry.Extended == nil { + continue + } + + versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey] + if !hasVersionId { + continue + } + + versionId := string(versionIdBytes) + + // Skip delete markers when finding latest content version + isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey] + if string(isDeleteMarkerBytes) == "true" { + continue + } + + // Compare version IDs chronologically (our version IDs start with timestamp) + if latestVersionId == "" || versionId > latestVersionId { + glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name) + latestVersionId = versionId + latestVersionFileName = entry.Name + } else { + glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId) + } + } + + // Update the .versions directory metadata + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + return fmt.Errorf("failed to get .versions directory: %v", err) + } + + if versionsEntry.Extended == nil { + versionsEntry.Extended = make(map[string][]byte) + } + + if latestVersionId != "" { + // Update metadata to point to new latest version + versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId) + versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName) + glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId) + } else { + // No versions left, remove latest version metadata + delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey) + delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey) + glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s", bucket, object) + } + + // Update the .versions directory entry + err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionsEntry.Extended + updatedEntry.Attributes = versionsEntry.Attributes + updatedEntry.Chunks = versionsEntry.Chunks + }) + if err != nil { + return fmt.Errorf("failed to update .versions directory metadata: %v", err) + } + return nil } @@ -450,24 +696,56 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http // getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) { bucketDir := s3a.option.BucketsPath + "/" + bucket - versionsObjectPath := object + ".versions" + cleanObject := strings.TrimPrefix(object, "/") + versionsObjectPath := cleanObject + ".versions" // Get the .versions directory entry to read latest version metadata versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) if err != nil { - return nil, fmt.Errorf("failed to get .versions directory: %w", err) + // .versions directory doesn't exist - this can happen for objects that existed + // before versioning was enabled on the bucket. Fall back to checking for a + // regular (non-versioned) object file. + glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s/%s, checking for pre-versioning object", bucket, object) + + regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject) + if regularErr != nil { + return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, cleanObject, err) + } + + glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, cleanObject) + return regularEntry, nil } // Check if directory has latest version metadata if versionsEntry.Extended == nil { - return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object) + // No metadata means all versioned objects have been deleted. + // Fall back to checking for a pre-versioning object. + glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, cleanObject) + + regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject) + if regularErr != nil { + return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject) + } + + glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, cleanObject) + return regularEntry, nil } latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] if !hasLatestVersionId || !hasLatestVersionFile { - return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object) + // No version metadata means all versioned objects have been deleted. + // Fall back to checking for a pre-versioning object. + glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object) + + regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject) + if regularErr != nil { + return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject) + } + + glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, cleanObject) + return regularEntry, nil } latestVersionId := string(latestVersionIdBytes) |
