diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-09 01:51:45 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-09 01:51:45 -0700 |
| commit | cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba (patch) | |
| tree | 3fb6c49d5a32e7a0518c268b984188e918c5e5ac /weed/s3api/s3api_object_handlers_put.go | |
| parent | 8fa1a69f8c915311326e75645681d10f66d9e222 (diff) | |
| download | seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.tar.xz seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.zip | |
S3: add object versioning (#6945)
* add object versioning
* add missing file
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* ListObjectVersionsResult is better to show multiple version entries
* fix test
* Update weed/s3api/s3api_object_handlers_put.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/s3api/s3api_object_versioning.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* multiple improvements
* move PutBucketVersioningHandler into weed/s3api/s3api_bucket_handlers.go file
* duplicated code for reading bucket config, versioningEnabled, etc. try to use functions
* opportunity to cache bucket config
* error handling if bucket is not found
* in case bucket is not found
* fix build
* add object versioning tests
* remove non-existent tests
* add tests
* add versioning tests
* skip a new test
* ensure .versions directory exists before saving info into it
* fix creating version entry
* logging on creating version directory
* Update s3api_object_versioning_test.go
* retry and wait for directory creation
* revert add more logging
* Update s3api_object_versioning.go
* more debug messages
* clean up logs, and touch directory correctly
* log the .versions creation and then parent directory listing
* use mkFile instead of touch
touch is for update
* clean up data
* add versioning test in go
* change location
* if modified, latest version is moved to .versions directory, and create a new latest version
Core versioning functionality: WORKING
TestVersioningBasicWorkflow - PASS
TestVersioningDeleteMarkers - PASS
TestVersioningMultipleVersionsSameObject - PASS
TestVersioningDeleteAndRecreate - PASS
TestVersioningListWithPagination - PASS
❌ Some advanced features still failing:
ETag calculation issues (using mtime instead of proper MD5)
Specific version retrieval (EOF error)
Version deletion (internal errors)
Concurrent operations (race conditions)
* calculate multi chunk md5
Test Results - All Passing:
✅ TestBucketListReturnDataVersioning - PASS
✅ TestVersioningCreateObjectsInOrder - PASS
✅ TestVersioningBasicWorkflow - PASS
✅ TestVersioningMultipleVersionsSameObject - PASS
✅ TestVersioningDeleteMarkers - PASS
* dedupe
* fix TestVersioningErrorCases
* fix eof error of reading old versions
* get specific version also check current version
* enable integration tests for versioning
* trigger action to work for now
* Fix GitHub Actions S3 versioning tests workflow
- Fix syntax error (incorrect indentation)
- Update directory paths from weed/s3api/versioning_tests/ to test/s3/versioning/
- Add push trigger for add-object-versioning branch to enable CI during development
- Update artifact paths to match correct directory structure
* Improve CI robustness for S3 versioning tests
Makefile improvements:
- Increase server startup timeout from 30s to 90s for CI environments
- Add progressive timeout reporting (logs at 30s, full logs at 90s)
- Better error handling with server logs on failure
- Add server PID tracking for debugging
- Improved test failure reporting
GitHub Actions workflow improvements:
- Increase job timeouts to account for CI environment delays
- Add system information logging (memory, disk space)
- Add detailed failure reporting with server logs
- Add process and network diagnostics on failure
- Better error messaging and log collection
These changes should resolve the 'Server failed to start within 30 seconds' issue
that was causing the CI tests to fail.
* adjust testing volume size
* Update Makefile
* Update Makefile
* Update Makefile
* Update Makefile
* Update s3-versioning-tests.yml
* Update s3api_object_versioning.go
* Update Makefile
* do not clean up
* log received version id
* more logs
* printout response
* print out list version response
* use tmp files when put versioned object
* change to versions folder layout
* Delete weed-test.log
* test with mixed versioned and unversioned objects
* remove versionDirCache
* remove unused functions
* remove unused function
* remove fallback checking
* minor
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 157 |
1 files changed, 148 insertions, 9 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 0b0be5fe5..8b85a049a 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -71,19 +71,53 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } } else { - uploadUrl := s3a.toFilerUrl(bucket, object) - if objectContentType == "" { - dataReader = mimeDetect(r, dataReader) + // Check if versioning is enabled for the bucket + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) + glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled) - if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, r, errCode) - return - } + if versioningEnabled { + // Handle versioned PUT + glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object) + versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + + // Set version ID in response header + if versionId != "" { + w.Header().Set("x-amz-version-id", versionId) + } + + // Set ETag in response + setEtag(w, etag) + } else { + // Handle regular PUT (non-versioned) + glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object) + uploadUrl := s3a.toFilerUrl(bucket, object) + if objectContentType == "" { + dataReader = mimeDetect(r, dataReader) + } + + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) + + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } - setEtag(w, etag) + setEtag(w, etag) + } } stats_collect.RecordBucketActiveTime(bucket) stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc() @@ -195,3 +229,108 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string } return string(encodedJwt) } + +// putVersionedObject handles PUT operations for versioned buckets using the new layout +// where all versions (including latest) are stored in the .versions directory +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { + // Generate version ID + versionId = generateVersionId() + + glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object) + + // Create the version file name + versionFileName := s3a.getVersionFileName(versionId) + + // Upload directly to the versions directory + // We need to construct the object path relative to the bucket + versionObjectPath := object + ".versions/" + versionFileName + versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath) + + hash := md5.New() + var body = io.TeeReader(dataReader, hash) + if objectContentType == "" { + body = mimeDetect(r, body) + } + + glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) + + etag, errCode = s3a.putToFiler(r, versionUploadUrl, body, "", bucket) + if errCode != s3err.ErrNone { + glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) + return "", "", errCode + } + + // Get the uploaded entry to add versioning metadata + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath) + if err != nil { + glog.Errorf("putVersionedObject: failed to get version entry: %v", err) + return "", "", s3err.ErrInternalError + } + + // Add versioning metadata to this version + if versionEntry.Extended == nil { + versionEntry.Extended = make(map[string][]byte) + } + versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + + // Store ETag with quotes for S3 compatibility + if !strings.HasPrefix(etag, "\"") { + etag = "\"" + etag + "\"" + } + versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) + + // Update the version entry with metadata + err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionEntry.Extended + updatedEntry.Attributes = versionEntry.Attributes + updatedEntry.Chunks = versionEntry.Chunks + }) + if err != nil { + glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) + return "", "", s3err.ErrInternalError + } + + // Update the .versions directory metadata to indicate this is the latest version + err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName) + if err != nil { + glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) + return "", "", s3err.ErrInternalError + } + + glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object) + return versionId, etag, s3err.ErrNone +} + +// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version +func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error { + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsObjectPath := object + ".versions" + + // Get the current .versions directory entry + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err) + return fmt.Errorf("failed to get .versions entry: %v", err) + } + + // Add or update the latest version metadata + if versionsEntry.Extended == nil { + versionsEntry.Extended = make(map[string][]byte) + } + versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId) + versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName) + + // Update the .versions directory entry with metadata + err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionsEntry.Extended + updatedEntry.Attributes = versionsEntry.Attributes + updatedEntry.Chunks = versionsEntry.Chunks + }) + if err != nil { + glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err) + return fmt.Errorf("failed to update .versions directory metadata: %v", err) + } + + return nil +} |
