aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_versioning.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-09 01:51:45 -0700
committerGitHub <noreply@github.com>2025-07-09 01:51:45 -0700
commitcf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba (patch)
tree3fb6c49d5a32e7a0518c268b984188e918c5e5ac /weed/s3api/s3api_object_versioning.go
parent8fa1a69f8c915311326e75645681d10f66d9e222 (diff)
downloadseaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.tar.xz
seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.zip
S3: add object versioning (#6945)
* add object versioning * add missing file * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * ListObjectVersionsResult is better to show multiple version entries * fix test * Update weed/s3api/s3api_object_handlers_put.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * multiple improvements * move PutBucketVersioningHandler into weed/s3api/s3api_bucket_handlers.go file * duplicated code for reading bucket config, versioningEnabled, etc. try to use functions * opportunity to cache bucket config * error handling if bucket is not found * in case bucket is not found * fix build * add object versioning tests * remove non-existent tests * add tests * add versioning tests * skip a new test * ensure .versions directory exists before saving info into it * fix creating version entry * logging on creating version directory * Update s3api_object_versioning_test.go * retry and wait for directory creation * revert add more logging * Update s3api_object_versioning.go * more debug messages * clean up logs, and touch directory correctly * log the .versions creation and then parent directory listing * use mkFile instead of touch touch is for update * clean up data * add versioning test in go * change location * if modified, latest version is moved to .versions directory, and create a new latest version Core versioning functionality: WORKING TestVersioningBasicWorkflow - PASS TestVersioningDeleteMarkers - PASS TestVersioningMultipleVersionsSameObject - PASS TestVersioningDeleteAndRecreate - PASS TestVersioningListWithPagination - PASS ❌ Some advanced features still failing: ETag calculation issues (using mtime instead of proper MD5) Specific version retrieval (EOF error) Version deletion (internal errors) Concurrent operations (race conditions) * calculate multi chunk md5 Test Results - All Passing: ✅ TestBucketListReturnDataVersioning - PASS ✅ TestVersioningCreateObjectsInOrder - PASS ✅ TestVersioningBasicWorkflow - PASS ✅ TestVersioningMultipleVersionsSameObject - PASS ✅ TestVersioningDeleteMarkers - PASS * dedupe * fix TestVersioningErrorCases * fix eof error of reading old versions * get specific version also check current version * enable integration tests for versioning * trigger action to work for now * Fix GitHub Actions S3 versioning tests workflow - Fix syntax error (incorrect indentation) - Update directory paths from weed/s3api/versioning_tests/ to test/s3/versioning/ - Add push trigger for add-object-versioning branch to enable CI during development - Update artifact paths to match correct directory structure * Improve CI robustness for S3 versioning tests Makefile improvements: - Increase server startup timeout from 30s to 90s for CI environments - Add progressive timeout reporting (logs at 30s, full logs at 90s) - Better error handling with server logs on failure - Add server PID tracking for debugging - Improved test failure reporting GitHub Actions workflow improvements: - Increase job timeouts to account for CI environment delays - Add system information logging (memory, disk space) - Add detailed failure reporting with server logs - Add process and network diagnostics on failure - Better error messaging and log collection These changes should resolve the 'Server failed to start within 30 seconds' issue that was causing the CI tests to fail. * adjust testing volume size * Update Makefile * Update Makefile * Update Makefile * Update Makefile * Update s3-versioning-tests.yml * Update s3api_object_versioning.go * Update Makefile * do not clean up * log received version id * more logs * printout response * print out list version response * use tmp files when put versioned object * change to versions folder layout * Delete weed-test.log * test with mixed versioned and unversioned objects * remove versionDirCache * remove unused functions * remove unused function * remove fallback checking * minor --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
-rw-r--r--weed/s3api/s3api_object_versioning.go486
1 files changed, 486 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
new file mode 100644
index 000000000..505605aa4
--- /dev/null
+++ b/weed/s3api/s3api_object_versioning.go
@@ -0,0 +1,486 @@
+package s3api
+
+import (
+ "crypto/rand"
+ "crypto/sha256"
+ "encoding/hex"
+ "encoding/xml"
+ "fmt"
+ "net/http"
+ "path"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+)
+
+// ObjectVersion represents a version of an S3 object
+type ObjectVersion struct {
+ VersionId string
+ IsLatest bool
+ IsDeleteMarker bool
+ LastModified time.Time
+ ETag string
+ Size int64
+ Entry *filer_pb.Entry
+}
+
+// ListObjectVersionsResult represents the response for ListObjectVersions
+type ListObjectVersionsResult struct {
+ XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"`
+ Name string `xml:"Name"`
+ Prefix string `xml:"Prefix"`
+ KeyMarker string `xml:"KeyMarker,omitempty"`
+ VersionIdMarker string `xml:"VersionIdMarker,omitempty"`
+ NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
+ NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"`
+ MaxKeys int `xml:"MaxKeys"`
+ Delimiter string `xml:"Delimiter,omitempty"`
+ IsTruncated bool `xml:"IsTruncated"`
+ Versions []VersionEntry `xml:"Version,omitempty"`
+ DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"`
+ CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
+}
+
+// generateVersionId creates a unique version ID
+func generateVersionId() string {
+ // Generate a random 16-byte value
+ randBytes := make([]byte, 16)
+ if _, err := rand.Read(randBytes); err != nil {
+ glog.Errorf("Failed to generate random bytes for version ID: %v", err)
+ return ""
+ }
+
+ // Hash with current timestamp for uniqueness
+ hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...))
+
+ // Return first 32 characters of hex string (same length as AWS S3 version IDs)
+ return hex.EncodeToString(hash[:])[:32]
+}
+
+// getVersionedObjectDir returns the directory path for storing object versions
+func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string {
+ return path.Join(s3a.option.BucketsPath, bucket, object+".versions")
+}
+
+// getVersionFileName returns the filename for a specific version
+func (s3a *S3ApiServer) getVersionFileName(versionId string) string {
+ return fmt.Sprintf("v_%s", versionId)
+}
+
+// createDeleteMarker creates a delete marker for versioned delete operations
+func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) {
+ versionId := generateVersionId()
+
+ glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object)
+
+ // Create the version file name for the delete marker
+ versionFileName := s3a.getVersionFileName(versionId)
+
+ // Store delete marker in the .versions directory
+ // Make sure to clean up the object path to remove leading slashes
+ cleanObject := strings.TrimPrefix(object, "/")
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsDir := bucketDir + "/" + cleanObject + ".versions"
+
+ // Create the delete marker entry in the .versions directory
+ err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) {
+ entry.Name = versionFileName
+ entry.IsDirectory = false
+ if entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
+ }
+ entry.Attributes.Mtime = time.Now().Unix()
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+ entry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("true")
+ })
+ if err != nil {
+ return "", fmt.Errorf("failed to create delete marker in .versions directory: %v", err)
+ }
+
+ // Update the .versions directory metadata to indicate this delete marker is the latest version
+ err = s3a.updateLatestVersionInDirectory(bucket, cleanObject, versionId, versionFileName)
+ if err != nil {
+ glog.Errorf("createDeleteMarker: failed to update latest version in directory: %v", err)
+ return "", fmt.Errorf("failed to update latest version in directory: %v", err)
+ }
+
+ glog.V(2).Infof("createDeleteMarker: successfully created delete marker %s for %s/%s", versionId, bucket, object)
+ return versionId, nil
+}
+
+// listObjectVersions lists all versions of an object
+func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) {
+ var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry
+
+ // List all entries in bucket
+ entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2))
+ if err != nil {
+ return nil, err
+ }
+
+ // For each entry, check if it's a .versions directory
+ for _, entry := range entries {
+ if !entry.IsDirectory {
+ continue
+ }
+
+ // Check if this is a .versions directory
+ if !strings.HasSuffix(entry.Name, ".versions") {
+ continue
+ }
+
+ // Extract object name from .versions directory name
+ objectKey := strings.TrimSuffix(entry.Name, ".versions")
+
+ versions, err := s3a.getObjectVersionList(bucket, objectKey)
+ if err != nil {
+ glog.Warningf("Failed to get versions for object %s: %v", objectKey, err)
+ continue
+ }
+
+ for _, version := range versions {
+ if version.IsDeleteMarker {
+ deleteMarker := &DeleteMarkerEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ }
+ allVersions = append(allVersions, deleteMarker)
+ } else {
+ versionEntry := &VersionEntry{
+ Key: objectKey,
+ VersionId: version.VersionId,
+ IsLatest: version.IsLatest,
+ LastModified: version.LastModified,
+ ETag: version.ETag,
+ Size: version.Size,
+ Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"},
+ StorageClass: "STANDARD",
+ }
+ allVersions = append(allVersions, versionEntry)
+ }
+ }
+ }
+
+ // Sort by key, then by LastModified and VersionId
+ sort.Slice(allVersions, func(i, j int) bool {
+ var keyI, keyJ string
+ var lastModifiedI, lastModifiedJ time.Time
+ var versionIdI, versionIdJ string
+
+ switch v := allVersions[i].(type) {
+ case *VersionEntry:
+ keyI = v.Key
+ lastModifiedI = v.LastModified
+ versionIdI = v.VersionId
+ case *DeleteMarkerEntry:
+ keyI = v.Key
+ lastModifiedI = v.LastModified
+ versionIdI = v.VersionId
+ }
+
+ switch v := allVersions[j].(type) {
+ case *VersionEntry:
+ keyJ = v.Key
+ lastModifiedJ = v.LastModified
+ versionIdJ = v.VersionId
+ case *DeleteMarkerEntry:
+ keyJ = v.Key
+ lastModifiedJ = v.LastModified
+ versionIdJ = v.VersionId
+ }
+
+ if keyI != keyJ {
+ return keyI < keyJ
+ }
+ if !lastModifiedI.Equal(lastModifiedJ) {
+ return lastModifiedI.After(lastModifiedJ)
+ }
+ return versionIdI < versionIdJ
+ })
+
+ // Build result
+ result := &ListObjectVersionsResult{
+ Name: bucket,
+ Prefix: prefix,
+ KeyMarker: keyMarker,
+ MaxKeys: maxKeys,
+ Delimiter: delimiter,
+ IsTruncated: len(allVersions) > maxKeys,
+ }
+
+ // Limit results
+ if len(allVersions) > maxKeys {
+ allVersions = allVersions[:maxKeys]
+ result.IsTruncated = true
+
+ // Set next markers
+ switch v := allVersions[len(allVersions)-1].(type) {
+ case *VersionEntry:
+ result.NextKeyMarker = v.Key
+ result.NextVersionIdMarker = v.VersionId
+ case *DeleteMarkerEntry:
+ result.NextKeyMarker = v.Key
+ result.NextVersionIdMarker = v.VersionId
+ }
+ }
+
+ // Add versions to result
+ for _, version := range allVersions {
+ switch v := version.(type) {
+ case *VersionEntry:
+ result.Versions = append(result.Versions, *v)
+ case *DeleteMarkerEntry:
+ result.DeleteMarkers = append(result.DeleteMarkers, *v)
+ }
+ }
+
+ return result, nil
+}
+
+// getObjectVersionList returns all versions of a specific object
+func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) {
+ var versions []*ObjectVersion
+
+ glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object)
+
+ // All versions are now stored in the .versions directory only
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsObjectPath := object + ".versions"
+ glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath)
+
+ // Get the .versions directory entry to read latest version metadata
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ // No versions directory exists, return empty list
+ glog.V(2).Infof("getObjectVersionList: no versions directory found: %v", err)
+ return versions, nil
+ }
+
+ // Get the latest version info from directory metadata
+ var latestVersionId string
+ if versionsEntry.Extended != nil {
+ if latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatestVersionId {
+ latestVersionId = string(latestVersionIdBytes)
+ glog.V(2).Infof("getObjectVersionList: latest version ID from directory metadata: %s", latestVersionId)
+ }
+ }
+
+ // List all version files in the .versions directory
+ entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000)
+ if err != nil {
+ glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err)
+ return versions, nil
+ }
+
+ glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries))
+
+ for i, entry := range entries {
+ if entry.Extended == nil {
+ glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i)
+ continue
+ }
+
+ versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
+ if !hasVersionId {
+ glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i)
+ continue
+ }
+
+ versionId := string(versionIdBytes)
+
+ // Check if this version is the latest by comparing with directory metadata
+ isLatest := (versionId == latestVersionId)
+
+ isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey]
+ isDeleteMarker := string(isDeleteMarkerBytes) == "true"
+
+ glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker)
+
+ version := &ObjectVersion{
+ VersionId: versionId,
+ IsLatest: isLatest,
+ IsDeleteMarker: isDeleteMarker,
+ LastModified: time.Unix(entry.Attributes.Mtime, 0),
+ Entry: entry,
+ }
+
+ if !isDeleteMarker {
+ // Try to get ETag from Extended attributes first
+ if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag {
+ version.ETag = string(etagBytes)
+ } else {
+ // Fallback: calculate ETag from chunks
+ version.ETag = s3a.calculateETagFromChunks(entry.Chunks)
+ }
+ version.Size = int64(entry.Attributes.FileSize)
+ }
+
+ versions = append(versions, version)
+ }
+
+ // Sort by modification time (newest first)
+ sort.Slice(versions, func(i, j int) bool {
+ return versions[i].LastModified.After(versions[j].LastModified)
+ })
+
+ glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object)
+ for i, version := range versions {
+ glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker)
+ }
+
+ return versions, nil
+}
+
+// calculateETagFromChunks calculates ETag from file chunks following S3 multipart rules
+// This is a wrapper around filer.ETagChunks that adds quotes for S3 compatibility
+func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) string {
+ if len(chunks) == 0 {
+ return "\"\""
+ }
+
+ // Use the existing filer ETag calculation and add quotes for S3 compatibility
+ etag := filer.ETagChunks(chunks)
+ if etag == "" {
+ return "\"\""
+ }
+ return fmt.Sprintf("\"%s\"", etag)
+}
+
+// getSpecificObjectVersion retrieves a specific version of an object
+func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
+ if versionId == "" {
+ // Get current version
+ return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/"))
+ }
+
+ // Get specific version from .versions directory
+ versionsDir := s3a.getVersionedObjectDir(bucket, object)
+ versionFile := s3a.getVersionFileName(versionId)
+
+ entry, err := s3a.getEntry(versionsDir, versionFile)
+ if err != nil {
+ return nil, fmt.Errorf("version %s not found: %v", versionId, err)
+ }
+
+ return entry, nil
+}
+
+// deleteSpecificObjectVersion deletes a specific version of an object
+func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
+ if versionId == "" {
+ return fmt.Errorf("version ID is required for version-specific deletion")
+ }
+
+ versionsDir := s3a.getVersionedObjectDir(bucket, object)
+ versionFile := s3a.getVersionFileName(versionId)
+
+ // Delete the specific version from .versions directory
+ _, err := s3a.getEntry(versionsDir, versionFile)
+ if err != nil {
+ return fmt.Errorf("version %s not found: %v", versionId, err)
+ }
+
+ // Version exists, delete it
+ deleteErr := s3a.rm(versionsDir, versionFile, true, false)
+ if deleteErr != nil {
+ // Check if file was already deleted by another process
+ if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil {
+ // File doesn't exist anymore, deletion was successful
+ return nil
+ }
+ return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr)
+ }
+ return nil
+}
+
+// ListObjectVersionsHandler handles the list object versions request
+// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
+func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
+ bucket, _ := s3_constants.GetBucketAndObject(r)
+ glog.V(3).Infof("ListObjectVersionsHandler %s", bucket)
+
+ if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, err)
+ return
+ }
+
+ // Parse query parameters
+ query := r.URL.Query()
+ prefix := query.Get("prefix")
+ if prefix != "" && !strings.HasPrefix(prefix, "/") {
+ prefix = "/" + prefix
+ }
+
+ keyMarker := query.Get("key-marker")
+ versionIdMarker := query.Get("version-id-marker")
+ delimiter := query.Get("delimiter")
+
+ maxKeysStr := query.Get("max-keys")
+ maxKeys := 1000
+ if maxKeysStr != "" {
+ if mk, err := strconv.Atoi(maxKeysStr); err == nil && mk > 0 {
+ maxKeys = mk
+ }
+ }
+
+ // List versions
+ result, err := s3a.listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys)
+ if err != nil {
+ glog.Errorf("ListObjectVersionsHandler: %v", err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
+ writeSuccessResponseXML(w, r, result)
+}
+
+// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
+func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsObjectPath := object + ".versions"
+
+ // Get the .versions directory entry to read latest version metadata
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get .versions directory: %v", err)
+ }
+
+ // Check if directory has latest version metadata
+ if versionsEntry.Extended == nil {
+ return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object)
+ }
+
+ latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]
+ latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey]
+
+ if !hasLatestVersionId || !hasLatestVersionFile {
+ return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object)
+ }
+
+ latestVersionId := string(latestVersionIdBytes)
+ latestVersionFile := string(latestVersionFileBytes)
+
+ glog.V(2).Infof("getLatestObjectVersion: found latest version %s (file: %s) for %s/%s", latestVersionId, latestVersionFile, bucket, object)
+
+ // Get the actual latest version file entry
+ latestVersionPath := versionsObjectPath + "/" + latestVersionFile
+ latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err)
+ }
+
+ return latestVersionEntry, nil
+}