diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-09 01:51:45 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-09 01:51:45 -0700 |
| commit | cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba (patch) | |
| tree | 3fb6c49d5a32e7a0518c268b984188e918c5e5ac /weed/s3api/s3api_object_versioning.go | |
| parent | 8fa1a69f8c915311326e75645681d10f66d9e222 (diff) | |
| download | seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.tar.xz seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.zip | |
S3: add object versioning (#6945)
* add object versioning
* add missing file
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* ListObjectVersionsResult is better to show multiple version entries
* fix test
* Update weed/s3api/s3api_object_handlers_put.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* multiple improvements
* move PutBucketVersioningHandler into weed/s3api/s3api_bucket_handlers.go file
* duplicated code for reading bucket config, versioningEnabled, etc. try to use functions
* opportunity to cache bucket config
* error handling if bucket is not found
* in case bucket is not found
* fix build
* add object versioning tests
* remove non-existent tests
* add tests
* add versioning tests
* skip a new test
* ensure .versions directory exists before saving info into it
* fix creating version entry
* logging on creating version directory
* Update s3api_object_versioning_test.go
* retry and wait for directory creation
* revert add more logging
* Update s3api_object_versioning.go
* more debug messages
* clean up logs, and touch directory correctly
* log the .versions creation and then parent directory listing
* use mkFile instead of touch
touch is for update
* clean up data
* add versioning test in go
* change location
* if modified, latest version is moved to .versions directory, and create a new latest version
Core versioning functionality: WORKING
TestVersioningBasicWorkflow - PASS
TestVersioningDeleteMarkers - PASS
TestVersioningMultipleVersionsSameObject - PASS
TestVersioningDeleteAndRecreate - PASS
TestVersioningListWithPagination - PASS
❌ Some advanced features still failing:
ETag calculation issues (using mtime instead of proper MD5)
Specific version retrieval (EOF error)
Version deletion (internal errors)
Concurrent operations (race conditions)
* calculate multi chunk md5
Test Results - All Passing:
✅ TestBucketListReturnDataVersioning - PASS
✅ TestVersioningCreateObjectsInOrder - PASS
✅ TestVersioningBasicWorkflow - PASS
✅ TestVersioningMultipleVersionsSameObject - PASS
✅ TestVersioningDeleteMarkers - PASS
* dedupe
* fix TestVersioningErrorCases
* fix eof error of reading old versions
* get specific version also check current version
* enable integration tests for versioning
* trigger action to work for now
* Fix GitHub Actions S3 versioning tests workflow
- Fix syntax error (incorrect indentation)
- Update directory paths from weed/s3api/versioning_tests/ to test/s3/versioning/
- Add push trigger for add-object-versioning branch to enable CI during development
- Update artifact paths to match correct directory structure
* Improve CI robustness for S3 versioning tests
Makefile improvements:
- Increase server startup timeout from 30s to 90s for CI environments
- Add progressive timeout reporting (logs at 30s, full logs at 90s)
- Better error handling with server logs on failure
- Add server PID tracking for debugging
- Improved test failure reporting
GitHub Actions workflow improvements:
- Increase job timeouts to account for CI environment delays
- Add system information logging (memory, disk space)
- Add detailed failure reporting with server logs
- Add process and network diagnostics on failure
- Better error messaging and log collection
These changes should resolve the 'Server failed to start within 30 seconds' issue
that was causing the CI tests to fail.
* adjust testing volume size
* Update Makefile
* Update Makefile
* Update Makefile
* Update Makefile
* Update s3-versioning-tests.yml
* Update s3api_object_versioning.go
* Update Makefile
* do not clean up
* log received version id
* more logs
* printout response
* print out list version response
* use tmp files when put versioned object
* change to versions folder layout
* Delete weed-test.log
* test with mixed versioned and unversioned objects
* remove versionDirCache
* remove unused functions
* remove unused function
* remove fallback checking
* minor
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_versioning.go')
| -rw-r--r-- | weed/s3api/s3api_object_versioning.go | 486 |
1 files changed, 486 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go new file mode 100644 index 000000000..505605aa4 --- /dev/null +++ b/weed/s3api/s3api_object_versioning.go @@ -0,0 +1,486 @@ +package s3api + +import ( + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/xml" + "fmt" + "net/http" + "path" + "sort" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + s3_constants "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// ObjectVersion represents a version of an S3 object +type ObjectVersion struct { + VersionId string + IsLatest bool + IsDeleteMarker bool + LastModified time.Time + ETag string + Size int64 + Entry *filer_pb.Entry +} + +// ListObjectVersionsResult represents the response for ListObjectVersions +type ListObjectVersionsResult struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListVersionsResult"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + KeyMarker string `xml:"KeyMarker,omitempty"` + VersionIdMarker string `xml:"VersionIdMarker,omitempty"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextVersionIdMarker string `xml:"NextVersionIdMarker,omitempty"` + MaxKeys int `xml:"MaxKeys"` + Delimiter string `xml:"Delimiter,omitempty"` + IsTruncated bool `xml:"IsTruncated"` + Versions []VersionEntry `xml:"Version,omitempty"` + DeleteMarkers []DeleteMarkerEntry `xml:"DeleteMarker,omitempty"` + CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` +} + +// generateVersionId creates a unique version ID +func generateVersionId() string { + // Generate a random 16-byte value + randBytes := make([]byte, 16) + if _, err := rand.Read(randBytes); err != nil { + glog.Errorf("Failed to generate random bytes for version ID: %v", err) + return "" + } + + // Hash with current timestamp for uniqueness + hash := sha256.Sum256(append(randBytes, []byte(fmt.Sprintf("%d", time.Now().UnixNano()))...)) + + // Return first 32 characters of hex string (same length as AWS S3 version IDs) + return hex.EncodeToString(hash[:])[:32] +} + +// getVersionedObjectDir returns the directory path for storing object versions +func (s3a *S3ApiServer) getVersionedObjectDir(bucket, object string) string { + return path.Join(s3a.option.BucketsPath, bucket, object+".versions") +} + +// getVersionFileName returns the filename for a specific version +func (s3a *S3ApiServer) getVersionFileName(versionId string) string { + return fmt.Sprintf("v_%s", versionId) +} + +// createDeleteMarker creates a delete marker for versioned delete operations +func (s3a *S3ApiServer) createDeleteMarker(bucket, object string) (string, error) { + versionId := generateVersionId() + + glog.V(2).Infof("createDeleteMarker: creating delete marker %s for %s/%s", versionId, bucket, object) + + // Create the version file name for the delete marker + versionFileName := s3a.getVersionFileName(versionId) + + // Store delete marker in the .versions directory + // Make sure to clean up the object path to remove leading slashes + cleanObject := strings.TrimPrefix(object, "/") + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsDir := bucketDir + "/" + cleanObject + ".versions" + + // Create the delete marker entry in the .versions directory + err := s3a.mkFile(versionsDir, versionFileName, nil, func(entry *filer_pb.Entry) { + entry.Name = versionFileName + entry.IsDirectory = false + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + entry.Attributes.Mtime = time.Now().Unix() + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + entry.Extended[s3_constants.ExtDeleteMarkerKey] = []byte("true") + }) + if err != nil { + return "", fmt.Errorf("failed to create delete marker in .versions directory: %v", err) + } + + // Update the .versions directory metadata to indicate this delete marker is the latest version + err = s3a.updateLatestVersionInDirectory(bucket, cleanObject, versionId, versionFileName) + if err != nil { + glog.Errorf("createDeleteMarker: failed to update latest version in directory: %v", err) + return "", fmt.Errorf("failed to update latest version in directory: %v", err) + } + + glog.V(2).Infof("createDeleteMarker: successfully created delete marker %s for %s/%s", versionId, bucket, object) + return versionId, nil +} + +// listObjectVersions lists all versions of an object +func (s3a *S3ApiServer) listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter string, maxKeys int) (*ListObjectVersionsResult, error) { + var allVersions []interface{} // Can contain VersionEntry or DeleteMarkerEntry + + // List all entries in bucket + entries, _, err := s3a.list(path.Join(s3a.option.BucketsPath, bucket), prefix, keyMarker, false, uint32(maxKeys*2)) + if err != nil { + return nil, err + } + + // For each entry, check if it's a .versions directory + for _, entry := range entries { + if !entry.IsDirectory { + continue + } + + // Check if this is a .versions directory + if !strings.HasSuffix(entry.Name, ".versions") { + continue + } + + // Extract object name from .versions directory name + objectKey := strings.TrimSuffix(entry.Name, ".versions") + + versions, err := s3a.getObjectVersionList(bucket, objectKey) + if err != nil { + glog.Warningf("Failed to get versions for object %s: %v", objectKey, err) + continue + } + + for _, version := range versions { + if version.IsDeleteMarker { + deleteMarker := &DeleteMarkerEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, + } + allVersions = append(allVersions, deleteMarker) + } else { + versionEntry := &VersionEntry{ + Key: objectKey, + VersionId: version.VersionId, + IsLatest: version.IsLatest, + LastModified: version.LastModified, + ETag: version.ETag, + Size: version.Size, + Owner: CanonicalUser{ID: "unknown", DisplayName: "unknown"}, + StorageClass: "STANDARD", + } + allVersions = append(allVersions, versionEntry) + } + } + } + + // Sort by key, then by LastModified and VersionId + sort.Slice(allVersions, func(i, j int) bool { + var keyI, keyJ string + var lastModifiedI, lastModifiedJ time.Time + var versionIdI, versionIdJ string + + switch v := allVersions[i].(type) { + case *VersionEntry: + keyI = v.Key + lastModifiedI = v.LastModified + versionIdI = v.VersionId + case *DeleteMarkerEntry: + keyI = v.Key + lastModifiedI = v.LastModified + versionIdI = v.VersionId + } + + switch v := allVersions[j].(type) { + case *VersionEntry: + keyJ = v.Key + lastModifiedJ = v.LastModified + versionIdJ = v.VersionId + case *DeleteMarkerEntry: + keyJ = v.Key + lastModifiedJ = v.LastModified + versionIdJ = v.VersionId + } + + if keyI != keyJ { + return keyI < keyJ + } + if !lastModifiedI.Equal(lastModifiedJ) { + return lastModifiedI.After(lastModifiedJ) + } + return versionIdI < versionIdJ + }) + + // Build result + result := &ListObjectVersionsResult{ + Name: bucket, + Prefix: prefix, + KeyMarker: keyMarker, + MaxKeys: maxKeys, + Delimiter: delimiter, + IsTruncated: len(allVersions) > maxKeys, + } + + // Limit results + if len(allVersions) > maxKeys { + allVersions = allVersions[:maxKeys] + result.IsTruncated = true + + // Set next markers + switch v := allVersions[len(allVersions)-1].(type) { + case *VersionEntry: + result.NextKeyMarker = v.Key + result.NextVersionIdMarker = v.VersionId + case *DeleteMarkerEntry: + result.NextKeyMarker = v.Key + result.NextVersionIdMarker = v.VersionId + } + } + + // Add versions to result + for _, version := range allVersions { + switch v := version.(type) { + case *VersionEntry: + result.Versions = append(result.Versions, *v) + case *DeleteMarkerEntry: + result.DeleteMarkers = append(result.DeleteMarkers, *v) + } + } + + return result, nil +} + +// getObjectVersionList returns all versions of a specific object +func (s3a *S3ApiServer) getObjectVersionList(bucket, object string) ([]*ObjectVersion, error) { + var versions []*ObjectVersion + + glog.V(2).Infof("getObjectVersionList: looking for versions of %s/%s in .versions directory", bucket, object) + + // All versions are now stored in the .versions directory only + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsObjectPath := object + ".versions" + glog.V(2).Infof("getObjectVersionList: checking versions directory %s", versionsObjectPath) + + // Get the .versions directory entry to read latest version metadata + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + // No versions directory exists, return empty list + glog.V(2).Infof("getObjectVersionList: no versions directory found: %v", err) + return versions, nil + } + + // Get the latest version info from directory metadata + var latestVersionId string + if versionsEntry.Extended != nil { + if latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey]; hasLatestVersionId { + latestVersionId = string(latestVersionIdBytes) + glog.V(2).Infof("getObjectVersionList: latest version ID from directory metadata: %s", latestVersionId) + } + } + + // List all version files in the .versions directory + entries, _, err := s3a.list(bucketDir+"/"+versionsObjectPath, "", "", false, 1000) + if err != nil { + glog.V(2).Infof("getObjectVersionList: failed to list version files: %v", err) + return versions, nil + } + + glog.V(2).Infof("getObjectVersionList: found %d entries in versions directory", len(entries)) + + for i, entry := range entries { + if entry.Extended == nil { + glog.V(2).Infof("getObjectVersionList: entry %d has no Extended metadata, skipping", i) + continue + } + + versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey] + if !hasVersionId { + glog.V(2).Infof("getObjectVersionList: entry %d has no version ID, skipping", i) + continue + } + + versionId := string(versionIdBytes) + + // Check if this version is the latest by comparing with directory metadata + isLatest := (versionId == latestVersionId) + + isDeleteMarkerBytes, _ := entry.Extended[s3_constants.ExtDeleteMarkerKey] + isDeleteMarker := string(isDeleteMarkerBytes) == "true" + + glog.V(2).Infof("getObjectVersionList: found version %s, isLatest=%v, isDeleteMarker=%v", versionId, isLatest, isDeleteMarker) + + version := &ObjectVersion{ + VersionId: versionId, + IsLatest: isLatest, + IsDeleteMarker: isDeleteMarker, + LastModified: time.Unix(entry.Attributes.Mtime, 0), + Entry: entry, + } + + if !isDeleteMarker { + // Try to get ETag from Extended attributes first + if etagBytes, hasETag := entry.Extended[s3_constants.ExtETagKey]; hasETag { + version.ETag = string(etagBytes) + } else { + // Fallback: calculate ETag from chunks + version.ETag = s3a.calculateETagFromChunks(entry.Chunks) + } + version.Size = int64(entry.Attributes.FileSize) + } + + versions = append(versions, version) + } + + // Sort by modification time (newest first) + sort.Slice(versions, func(i, j int) bool { + return versions[i].LastModified.After(versions[j].LastModified) + }) + + glog.V(2).Infof("getObjectVersionList: returning %d total versions for %s/%s", len(versions), bucket, object) + for i, version := range versions { + glog.V(2).Infof("getObjectVersionList: version %d: %s (isLatest=%v, isDeleteMarker=%v)", i, version.VersionId, version.IsLatest, version.IsDeleteMarker) + } + + return versions, nil +} + +// calculateETagFromChunks calculates ETag from file chunks following S3 multipart rules +// This is a wrapper around filer.ETagChunks that adds quotes for S3 compatibility +func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) string { + if len(chunks) == 0 { + return "\"\"" + } + + // Use the existing filer ETag calculation and add quotes for S3 compatibility + etag := filer.ETagChunks(chunks) + if etag == "" { + return "\"\"" + } + return fmt.Sprintf("\"%s\"", etag) +} + +// getSpecificObjectVersion retrieves a specific version of an object +func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) { + if versionId == "" { + // Get current version + return s3a.getEntry(path.Join(s3a.option.BucketsPath, bucket), strings.TrimPrefix(object, "/")) + } + + // Get specific version from .versions directory + versionsDir := s3a.getVersionedObjectDir(bucket, object) + versionFile := s3a.getVersionFileName(versionId) + + entry, err := s3a.getEntry(versionsDir, versionFile) + if err != nil { + return nil, fmt.Errorf("version %s not found: %v", versionId, err) + } + + return entry, nil +} + +// deleteSpecificObjectVersion deletes a specific version of an object +func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error { + if versionId == "" { + return fmt.Errorf("version ID is required for version-specific deletion") + } + + versionsDir := s3a.getVersionedObjectDir(bucket, object) + versionFile := s3a.getVersionFileName(versionId) + + // Delete the specific version from .versions directory + _, err := s3a.getEntry(versionsDir, versionFile) + if err != nil { + return fmt.Errorf("version %s not found: %v", versionId, err) + } + + // Version exists, delete it + deleteErr := s3a.rm(versionsDir, versionFile, true, false) + if deleteErr != nil { + // Check if file was already deleted by another process + if _, checkErr := s3a.getEntry(versionsDir, versionFile); checkErr != nil { + // File doesn't exist anymore, deletion was successful + return nil + } + return fmt.Errorf("failed to delete version %s: %v", versionId, deleteErr) + } + return nil +} + +// ListObjectVersionsHandler handles the list object versions request +// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html +func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http.Request) { + bucket, _ := s3_constants.GetBucketAndObject(r) + glog.V(3).Infof("ListObjectVersionsHandler %s", bucket) + + if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, err) + return + } + + // Parse query parameters + query := r.URL.Query() + prefix := query.Get("prefix") + if prefix != "" && !strings.HasPrefix(prefix, "/") { + prefix = "/" + prefix + } + + keyMarker := query.Get("key-marker") + versionIdMarker := query.Get("version-id-marker") + delimiter := query.Get("delimiter") + + maxKeysStr := query.Get("max-keys") + maxKeys := 1000 + if maxKeysStr != "" { + if mk, err := strconv.Atoi(maxKeysStr); err == nil && mk > 0 { + maxKeys = mk + } + } + + // List versions + result, err := s3a.listObjectVersions(bucket, prefix, keyMarker, versionIdMarker, delimiter, maxKeys) + if err != nil { + glog.Errorf("ListObjectVersionsHandler: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + writeSuccessResponseXML(w, r, result) +} + +// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata +func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) { + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsObjectPath := object + ".versions" + + // Get the .versions directory entry to read latest version metadata + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + return nil, fmt.Errorf("failed to get .versions directory: %v", err) + } + + // Check if directory has latest version metadata + if versionsEntry.Extended == nil { + return nil, fmt.Errorf("no version metadata found in .versions directory for %s/%s", bucket, object) + } + + latestVersionIdBytes, hasLatestVersionId := versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] + latestVersionFileBytes, hasLatestVersionFile := versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] + + if !hasLatestVersionId || !hasLatestVersionFile { + return nil, fmt.Errorf("incomplete latest version metadata in .versions directory for %s/%s", bucket, object) + } + + latestVersionId := string(latestVersionIdBytes) + latestVersionFile := string(latestVersionFileBytes) + + glog.V(2).Infof("getLatestObjectVersion: found latest version %s (file: %s) for %s/%s", latestVersionId, latestVersionFile, bucket, object) + + // Get the actual latest version file entry + latestVersionPath := versionsObjectPath + "/" + latestVersionFile + latestVersionEntry, err := s3a.getEntry(bucketDir, latestVersionPath) + if err != nil { + return nil, fmt.Errorf("failed to get latest version file %s: %v", latestVersionPath, err) + } + + return latestVersionEntry, nil +} |
