aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-07-09 01:51:45 -0700
committerGitHub <noreply@github.com>2025-07-09 01:51:45 -0700
commitcf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba (patch)
tree3fb6c49d5a32e7a0518c268b984188e918c5e5ac /weed/s3api/s3api_object_handlers_put.go
parent8fa1a69f8c915311326e75645681d10f66d9e222 (diff)
downloadseaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.tar.xz
seaweedfs-cf5a24983a0d6a5b6955f5cded4d5e1a4c6484ba.zip
S3: add object versioning (#6945)
* add object versioning * add missing file * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * ListObjectVersionsResult is better to show multiple version entries * fix test * Update weed/s3api/s3api_object_handlers_put.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update weed/s3api/s3api_object_versioning.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * multiple improvements * move PutBucketVersioningHandler into weed/s3api/s3api_bucket_handlers.go file * duplicated code for reading bucket config, versioningEnabled, etc. try to use functions * opportunity to cache bucket config * error handling if bucket is not found * in case bucket is not found * fix build * add object versioning tests * remove non-existent tests * add tests * add versioning tests * skip a new test * ensure .versions directory exists before saving info into it * fix creating version entry * logging on creating version directory * Update s3api_object_versioning_test.go * retry and wait for directory creation * revert add more logging * Update s3api_object_versioning.go * more debug messages * clean up logs, and touch directory correctly * log the .versions creation and then parent directory listing * use mkFile instead of touch touch is for update * clean up data * add versioning test in go * change location * if modified, latest version is moved to .versions directory, and create a new latest version Core versioning functionality: WORKING TestVersioningBasicWorkflow - PASS TestVersioningDeleteMarkers - PASS TestVersioningMultipleVersionsSameObject - PASS TestVersioningDeleteAndRecreate - PASS TestVersioningListWithPagination - PASS ❌ Some advanced features still failing: ETag calculation issues (using mtime instead of proper MD5) Specific version retrieval (EOF error) Version deletion (internal errors) Concurrent operations (race conditions) * calculate multi chunk md5 Test Results - All Passing: ✅ TestBucketListReturnDataVersioning - PASS ✅ TestVersioningCreateObjectsInOrder - PASS ✅ TestVersioningBasicWorkflow - PASS ✅ TestVersioningMultipleVersionsSameObject - PASS ✅ TestVersioningDeleteMarkers - PASS * dedupe * fix TestVersioningErrorCases * fix eof error of reading old versions * get specific version also check current version * enable integration tests for versioning * trigger action to work for now * Fix GitHub Actions S3 versioning tests workflow - Fix syntax error (incorrect indentation) - Update directory paths from weed/s3api/versioning_tests/ to test/s3/versioning/ - Add push trigger for add-object-versioning branch to enable CI during development - Update artifact paths to match correct directory structure * Improve CI robustness for S3 versioning tests Makefile improvements: - Increase server startup timeout from 30s to 90s for CI environments - Add progressive timeout reporting (logs at 30s, full logs at 90s) - Better error handling with server logs on failure - Add server PID tracking for debugging - Improved test failure reporting GitHub Actions workflow improvements: - Increase job timeouts to account for CI environment delays - Add system information logging (memory, disk space) - Add detailed failure reporting with server logs - Add process and network diagnostics on failure - Better error messaging and log collection These changes should resolve the 'Server failed to start within 30 seconds' issue that was causing the CI tests to fail. * adjust testing volume size * Update Makefile * Update Makefile * Update Makefile * Update Makefile * Update s3-versioning-tests.yml * Update s3api_object_versioning.go * Update Makefile * do not clean up * log received version id * more logs * printout response * print out list version response * use tmp files when put versioned object * change to versions folder layout * Delete weed-test.log * test with mixed versioned and unversioned objects * remove versionDirCache * remove unused functions * remove unused function * remove fallback checking * minor --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go157
1 files changed, 148 insertions, 9 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 0b0be5fe5..8b85a049a 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -71,19 +71,53 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
- uploadUrl := s3a.toFilerUrl(bucket, object)
- if objectContentType == "" {
- dataReader = mimeDetect(r, dataReader)
+ // Check if versioning is enabled for the bucket
+ versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
+ return
+ }
+ glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
}
- etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+ glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
+ if versioningEnabled {
+ // Handle versioned PUT
+ glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
+ versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
+
+ // Set version ID in response header
+ if versionId != "" {
+ w.Header().Set("x-amz-version-id", versionId)
+ }
+
+ // Set ETag in response
+ setEtag(w, etag)
+ } else {
+ // Handle regular PUT (non-versioned)
+ glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
+ uploadUrl := s3a.toFilerUrl(bucket, object)
+ if objectContentType == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
+
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
- setEtag(w, etag)
+ setEtag(w, etag)
+ }
}
stats_collect.RecordBucketActiveTime(bucket)
stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
@@ -195,3 +229,108 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
}
return string(encodedJwt)
}
+
+// putVersionedObject handles PUT operations for versioned buckets using the new layout
+// where all versions (including latest) are stored in the .versions directory
+func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
+ // Generate version ID
+ versionId = generateVersionId()
+
+ glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
+
+ // Create the version file name
+ versionFileName := s3a.getVersionFileName(versionId)
+
+ // Upload directly to the versions directory
+ // We need to construct the object path relative to the bucket
+ versionObjectPath := object + ".versions/" + versionFileName
+ versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
+
+ hash := md5.New()
+ var body = io.TeeReader(dataReader, hash)
+ if objectContentType == "" {
+ body = mimeDetect(r, body)
+ }
+
+ glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
+
+ etag, errCode = s3a.putToFiler(r, versionUploadUrl, body, "", bucket)
+ if errCode != s3err.ErrNone {
+ glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
+ return "", "", errCode
+ }
+
+ // Get the uploaded entry to add versioning metadata
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ // Add versioning metadata to this version
+ if versionEntry.Extended == nil {
+ versionEntry.Extended = make(map[string][]byte)
+ }
+ versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+
+ // Store ETag with quotes for S3 compatibility
+ if !strings.HasPrefix(etag, "\"") {
+ etag = "\"" + etag + "\""
+ }
+ versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
+
+ // Update the version entry with metadata
+ err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionEntry.Extended
+ updatedEntry.Attributes = versionEntry.Attributes
+ updatedEntry.Chunks = versionEntry.Chunks
+ })
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ // Update the .versions directory metadata to indicate this is the latest version
+ err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
+ return versionId, etag, s3err.ErrNone
+}
+
+// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
+func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsObjectPath := object + ".versions"
+
+ // Get the current .versions directory entry
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
+ return fmt.Errorf("failed to get .versions entry: %v", err)
+ }
+
+ // Add or update the latest version metadata
+ if versionsEntry.Extended == nil {
+ versionsEntry.Extended = make(map[string][]byte)
+ }
+ versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId)
+ versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName)
+
+ // Update the .versions directory entry with metadata
+ err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionsEntry.Extended
+ updatedEntry.Attributes = versionsEntry.Attributes
+ updatedEntry.Chunks = versionsEntry.Chunks
+ })
+ if err != nil {
+ glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err)
+ return fmt.Errorf("failed to update .versions directory metadata: %v", err)
+ }
+
+ return nil
+}