aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_versioning.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
-rw-r--r--weed/s3api/s3api_object_versioning.go422
1 files changed, 350 insertions, 72 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
index cfb3d597c..d1893cb85 100644
--- a/weed/s3api/s3api_object_versioning.go
+++ b/weed/s3api/s3api_object_versioning.go
@@ -2,7 +2,6 @@ package s3api
import (
"crypto/rand"
- "crypto/sha256"
"encoding/hex"
"encoding/xml"
"fmt"
@@ -48,20 +47,26 @@ type ListObjectVersionsResult struct {
CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
}
-// generateVersionId creates a unique version ID
+// generateVersionId creates a unique version ID that preserves chronological order
func generateVersionId() string {
- // Generate a random 16-byte value
- randBytes := make([]byte, 16)
+ // Use nanosecond timestamp to ensure chronological ordering
+ // Format as 16-digit hex (first 16 chars of version ID)
+ now := time.Now().UnixNano()
+ timestampHex := fmt.Sprintf("%016x", now)
+
+ // Generate random 8 bytes for uniqueness (last 16 chars of version ID)
+ randBytes := make([]byte, 8)
if _, err := rand.Read(randBytes); err != nil {
glog.Errorf("Failed to generate random bytes for version ID: %v", err)
- return ""
+ // Fallback to timestamp-only if random generation fails
+ return timestampHex + "0000000000000000"
}
- // Hash with current timestamp for uniqueness
- hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...))
+ // Combine timestamp (16 chars) + random (16 chars) = 32 chars total
+ randomHex := hex.EncodeToString(randBytes)
+ versionId := timestampHex + randomHex
- // Return first 32 characters of hex string (same length as AWS S3 version IDs)
- return hex.EncodeToString(hash[:])[:32]
+ return versionId
}
// getVersionedObjectDir returns the directory path for storing object versions
@@ -122,59 +127,20 @@ func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error
func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
- // List all entries in bucket
- entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2))
- if err != nil {
- return nil, err
- }
-
- // For each entry, check if it's a .versions directory
- for _, entry := range entries {
- if !entry.IsDirectory {
- continue
- }
-
- // Check if this is a .versions directory
- if !strings.HasSuffix(entry.Name, ".versions") {
- continue
- }
+ // Track objects that have been processed to avoid duplicates
+ processedObjects := make(map[string]bool)
- // Extract object name from .versions directory name
- objectKey := strings.TrimSuffix(entry.Name, ".versions")
+ // Track version IDs globally to prevent duplicates throughout the listing
+ seenVersionIds := make(map[string]bool)
- versions, err := s3a.getObjectVersionList(bucket, objectKey)
- if err != nil {
- glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
- continue
- }
-
- for _, version := range versions {
- if version.IsDeleteMarker {
- deleteMarker := &DeleteMarkerEntry{
- Key: objectKey,
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
- }
- allVersions = append(allVersions, deleteMarker)
- } else {
- versionEntry := &VersionEntry{
- Key: objectKey,
- VersionId: version.VersionId,
- IsLatest: version.IsLatest,
- LastModified: version.LastModified,
- ETag: version.ETag,
- Size: version.Size,
- Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
- StorageClass: "STANDARD",
- }
- allVersions = append(allVersions, versionEntry)
- }
- }
+ // Recursively find all .versions directories in the bucket
+ bucketPath := path.Join(s3a.option.BucketsPath, bucket)
+ err := s3a.findVersionsRecursively(bucketPath, "", &allVersions, processedObjects, seenVersionIds, bucket, prefix)
+ if err != nil {
+ return nil, err
}
- // Sort by key, then by LastModified and VersionId
+ // Sort by key, then by LastModified (newest first), then by VersionId for deterministic ordering
sort.Slice(allVersions, func(i, j int) bool {
var keyI, keyJ string
var lastModifiedI, lastModifiedJ time.Time
@@ -202,13 +168,20 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
versionIdJ = v.VersionId
}
+ // First sort by object key
if keyI != keyJ {
return keyI < keyJ
}
- if !lastModifiedI.Equal(lastModifiedJ) {
+
+ // Then by modification time (newest first) - but use nanosecond precision for ties
+ timeDiff := lastModifiedI.Sub(lastModifiedJ)
+ if timeDiff.Abs() > time.Millisecond {
return lastModifiedI.After(lastModifiedJ)
}
- return versionIdI < versionIdJ
+
+ // For very close timestamps (within 1ms), use version ID for deterministic ordering
+ // Sort version IDs in reverse lexicographic order to maintain newest-first semantics
+ return versionIdI > versionIdJ
})
// Build result
@@ -237,6 +210,10 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
}
}
+ // Always initialize empty slices so boto3 gets the expected fields even when empty
+ result.Versions = make([]VersionEntry, 0)
+ result.DeleteMarkers = make([]DeleteMarkerEntry, 0)
+
// Add versions to result
for _, version := range allVersions {
switch v := version.(type) {
@@ -250,6 +227,128 @@ func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdM
return result, nil
}
+// findVersionsRecursively searches for all .versions directories and regular files recursively
+func (s3a *S3ApiServer) findVersionsRecursively(currentPath, relativePath string, allVersions *[]interface{}, processedObjects map[string]bool, seenVersionIds map[string]bool, bucket, prefix string) error {
+ // List entries in current directory
+ entries, _, err := s3a.list(currentPath, "", "", false, 1000)
+ if err != nil {
+ return err
+ }
+
+ for _, entry := range entries {
+ entryPath := path.Join(relativePath, entry.Name)
+
+ // Skip if this doesn't match the prefix filter
+ if prefix != "" && !strings.HasPrefix(entryPath, strings.TrimPrefix(prefix, "/")) {
+ continue
+ }
+
+ if entry.IsDirectory {
+ // Skip .uploads directory (multipart upload temporary files)
+ if strings.HasPrefix(entry.Name, ".uploads") {
+ continue
+ }
+
+ // Check if this is a .versions directory
+ if strings.HasSuffix(entry.Name, ".versions") {
+ // Extract object name from .versions directory name
+ objectKey := strings.TrimSuffix(entryPath, ".versions")
+ processedObjects[objectKey] = true
+
+ glog.V(2).Infof("findVersionsRecursively: found .versions directory for object %s", objectKey)
+
+ versions, err := s3a.getObjectVersionList(bucket, objectKey)
+ if err != nil {
+ glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
+ continue
+ }
+
+ for _, version := range versions {
+ // Check for duplicate version IDs and skip if already seen
+ versionKey := objectKey + ":" + version.VersionId
+ if seenVersionIds[versionKey] {
+ glog.Warningf("findVersionsRecursively: duplicate version %s for object %s detected, skipping", version.VersionId, objectKey)
+ continue
+ }
+ seenVersionIds[versionKey] = true
+
+ if version.IsDeleteMarker {
+ deleteMarker := &DeleteMarkerEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ }
+ *allVersions = append(*allVersions, deleteMarker)
+ } else {
+ versionEntry := &VersionEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ ETag: version.ETag,
+ Size: version.Size,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
+ }
+ }
+ } else {
+ // Recursively search subdirectories
+ fullPath := path.Join(currentPath, entry.Name)
+ err := s3a.findVersionsRecursively(fullPath, entryPath, allVersions, processedObjects, seenVersionIds, bucket, prefix)
+ if err != nil {
+ glog.Warningf("Error searching subdirectory %s: %v", entryPath, err)
+ continue
+ }
+ }
+ } else {
+ // This is a regular file - check if it's a pre-versioning object
+ objectKey := entryPath
+
+ // Skip if this object already has a .versions directory (already processed)
+ if processedObjects[objectKey] {
+ continue
+ }
+
+ // This is a pre-versioning object - treat it as a version with VersionId="null"
+ glog.V(2).Infof("findVersionsRecursively: found pre-versioning object %s", objectKey)
+
+ // Check if this null version should be marked as latest
+ // It's only latest if there's no .versions directory OR no latest version metadata
+ isLatest := true
+ versionsObjectPath := objectKey + ".versions"
+ if versionsEntry, err := s3a.getEntry(currentPath, versionsObjectPath); err == nil {
+ // .versions directory exists, check if there's latest version metadata
+ if versionsEntry.Extended != nil {
+ if _, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
+ // There is a latest version in the .versions directory, so null is not latest
+ isLatest = false
+ glog.V(2).Infof("findVersionsRecursively: null version for %s is not latest due to versioned objects", objectKey)
+ }
+ }
+ }
+
+ etag := s3a.calculateETagFromChunks(entry.Chunks)
+ versionEntry := &VersionEntry{
+ Key: objectKey,
+ VersionId: "null",
+ IsLatest: isLatest,
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ ETag: etag,
+ Size: int64(entry.Attributes.FileSize),
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ StorageClass: "STANDARD",
+ }
+ *allVersions = append(*allVersions, versionEntry)
+ }
+ }
+
+ return nil
+}
+
// getObjectVersionList returns all versions of a specific object
func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
var versions []*ObjectVersion
@@ -287,6 +386,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
+ // Use a map to detect and prevent duplicate version IDs
+ seenVersionIds := make(map[string]bool)
+
for i, entry := range entries {
if entry.Extended == nil {
glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
@@ -301,6 +403,13 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versionId := string(versionIdBytes)
+ // Check for duplicate version IDs and skip if already seen
+ if seenVersionIds[versionId] {
+ glog.Warningf("getObjectVersionList: duplicate version ID %s detected for object %s/%s, skipping", versionId, bucket, object)
+ continue
+ }
+ seenVersionIds[versionId] = true
+
// Check if this version is the latest by comparing with directory metadata
isLatest := (versionId == latestVersionId)
@@ -331,12 +440,9 @@ func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVe
versions = append(versions, version)
}
- // Sort by modification time (newest first)
- sort.Slice(versions, func(i, j int) bool {
- return versions[i].LastModified.After(versions[j].LastModified)
- })
+ // Don't sort here - let the main listObjectVersions function handle sorting consistently
- glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object)
+ glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s (after deduplication from %d entries)", len(versions), bucket, object, len(entries))
for i, version := range versions {
glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
}
@@ -366,6 +472,16 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
}
+ if versionId == "null" {
+ // "null" version ID refers to pre-versioning objects stored as regular files
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ entry, err := s3a.getEntry(bucketDir, object)
+ if err != nil {
+ return nil, fmt.Errorf("null version object %s not found: %v", object, err)
+ }
+ return entry, nil
+ }
+
// Get specific version from .versions directory
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -384,6 +500,32 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version ID is required for version-specific deletion")
}
+ if versionId == "null" {
+ // Delete "null" version (pre-versioning object stored as regular file)
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ cleanObject := strings.TrimPrefix(object, "/")
+
+ // Check if the object exists
+ _, err := s3a.getEntry(bucketDir, cleanObject)
+ if err != nil {
+ // Object doesn't exist - this is OK for delete operations (idempotent)
+ glog.V(2).Infof("deleteSpecificObjectVersion: null version object %s already deleted or doesn't exist", cleanObject)
+ return nil
+ }
+
+ // Delete the regular file
+ deleteErr := s3a.rm(bucketDir, cleanObject, true, false)
+ if deleteErr != nil {
+ // Check if file was already deleted by another process
+ if _, checkErr := s3a.getEntry(bucketDir, cleanObject); checkErr != nil {
+ // File doesn't exist anymore, deletion was successful
+ return nil
+ }
+ return fmt.Errorf("failed to delete null version %s: %v", cleanObject, deleteErr)
+ }
+ return nil
+ }
+
versionsDir := s3a.getVersionedObjectDir(bucket, object)
versionFile := s3a.getVersionFileName(versionId)
@@ -393,16 +535,120 @@ func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId st
return fmt.Errorf("version %s not found: %v", versionId, err)
}
- // Version exists, delete it
+ // Check if this is the latest version before deleting
+ versionsEntry, dirErr := s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), object+".versions")
+ isLatestVersion := false
+ if dirErr == nil && versionsEntry.Extended != nil {
+ if latestVersionIdBytes, hasLatest := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatest {
+ isLatestVersion = (string(latestVersionIdBytes) == versionId)
+ }
+ }
+
+ // Delete the version file
deleteErr := s3a.rm(versionsDir, versionFile, true, false)
if deleteErr != nil {
// Check if file was already deleted by another process
if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
// File doesn't exist anymore, deletion was successful
- return nil
+ } else {
+ return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
- return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
}
+
+ // If we deleted the latest version, update the .versions directory metadata to point to the new latest
+ if isLatestVersion {
+ err := s3a.updateLatestVersionAfterDeletion(bucket, object)
+ if err != nil {
+ glog.Warningf("deleteSpecificObjectVersion: failed to update latest version after deletion: %v", err)
+ // Don't return error since the deletion was successful
+ }
+ }
+
+ return nil
+}
+
+// updateLatestVersionAfterDeletion finds the new latest version after deleting the current latest
+func (s3a *S3ApiServer) updateLatestVersionAfterDeletion(bucket, object string) error {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ cleanObject := strings.TrimPrefix(object, "/")
+ versionsObjectPath := cleanObject + ".versions"
+ versionsDir := bucketDir + "/" + versionsObjectPath
+
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: updating latest version for %s/%s, listing %s", bucket, object, versionsDir)
+
+ // List all remaining version files in the .versions directory
+ entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
+ if err != nil {
+ glog.Errorf("updateLatestVersionAfterDeletion: failed to list versions in %s: %v", versionsDir, err)
+ return fmt.Errorf("failed to list versions: %v", err)
+ }
+
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: found %d entries in %s", len(entries), versionsDir)
+
+ // Find the most recent remaining version (latest timestamp in version ID)
+ var latestVersionId string
+ var latestVersionFileName string
+
+ for _, entry := range entries {
+ if entry.Extended == nil {
+ continue
+ }
+
+ versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
+ if !hasVersionId {
+ continue
+ }
+
+ versionId := string(versionIdBytes)
+
+ // Skip delete markers when finding latest content version
+ isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
+ if string(isDeleteMarkerBytes) == "true" {
+ continue
+ }
+
+ // Compare version IDs chronologically (our version IDs start with timestamp)
+ if latestVersionId == "" || versionId > latestVersionId {
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: found newer version %s (file: %s)", versionId, entry.Name)
+ latestVersionId = versionId
+ latestVersionFileName = entry.Name
+ } else {
+ glog.V(1).Infof("updateLatestVersionAfterDeletion: skipping older version %s", versionId)
+ }
+ }
+
+ // Update the .versions directory metadata
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ return fmt.Errorf("failed to get .versions directory: %v", err)
+ }
+
+ if versionsEntry.Extended == nil {
+ versionsEntry.Extended = make(map[string][]byte)
+ }
+
+ if latestVersionId != "" {
+ // Update metadata to point to new latest version
+ versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(latestVersionId)
+ versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(latestVersionFileName)
+ glog.V(2).Infof("updateLatestVersionAfterDeletion: new latest version for %s/%s is %s", bucket, object, latestVersionId)
+ } else {
+ // No versions left, remove latest version metadata
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
+ glog.V(2).Infof("updateLatestVersionAfterDeletion: no versions left for %s/%s", bucket, object)
+ }
+
+ // Update the .versions directory entry
+ err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionsEntry.Extended
+ updatedEntry.Attributes = versionsEntry.Attributes
+ updatedEntry.Chunks = versionsEntry.Chunks
+ })
+ if err != nil {
+ return fmt.Errorf("failed to update .versions directory metadata: %v", err)
+ }
+
return nil
}
@@ -450,24 +696,56 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionsObjectPath := object + ".versions"
+ cleanObject := strings.TrimPrefix(object, "/")
+ versionsObjectPath := cleanObject + ".versions"
// Get the .versions directory entry to read latest version metadata
versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
if err != nil {
- return nil, fmt.Errorf("failed to get .versions directory: %w", err)
+ // .versions directory doesn't exist - this can happen for objects that existed
+ // before versioning was enabled on the bucket. Fall back to checking for a
+ // regular (non-versioned) object file.
+ glog.V(2).Infof("getLatestObjectVersion: no .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("failed to get %s/%s .versions directory and no regular object found: %w", bucket, cleanObject, err)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s", bucket, cleanObject)
+ return regularEntry, nil
}
// Check if directory has latest version metadata
if versionsEntry.Extended == nil {
- return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object)
+ // No metadata means all versioned objects have been deleted.
+ // Fall back to checking for a pre-versioning object.
+ glog.V(2).Infof("getLatestObjectVersion: no Extended metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, cleanObject)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s (no Extended metadata case)", bucket, cleanObject)
+ return regularEntry, nil
}
latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
if !hasLatestVersionId || !hasLatestVersionFile {
- return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object)
+ // No version metadata means all versioned objects have been deleted.
+ // Fall back to checking for a pre-versioning object.
+ glog.V(2).Infof("getLatestObjectVersion: no version metadata in .versions directory for %s/%s, checking for pre-versioning object", bucket, object)
+
+ regularEntry, regularErr := s3a.getEntry(bucketDir, cleanObject)
+ if regularErr != nil {
+ return nil, fmt.Errorf("no version metadata in .versions directory and no regular object found for %s/%s", bucket, cleanObject)
+ }
+
+ glog.V(2).Infof("getLatestObjectVersion: found pre-versioning object for %s/%s after version deletion", bucket, cleanObject)
+ return regularEntry, nil
}
latestVersionId := string(latestVersionIdBytes)