aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go159
1 files changed, 152 insertions, 7 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 011a039d3..b048cb663 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -95,8 +95,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
- // Check if versioning is enabled for the bucket
- versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ // Get detailed versioning state for the bucket
+ versioningState, err := s3a.getVersioningState(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
@@ -107,7 +107,10 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
- glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled)
+ versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
+ versioningConfigured := (versioningState != "")
+
+ glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState)
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@@ -118,7 +121,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// For non-versioned buckets, check if existing object has object lock protections
// that would prevent overwrite (PUT operations overwrite existing objects in non-versioned buckets)
- if !versioningEnabled {
+ if !versioningConfigured {
governanceBypassAllowed := s3a.evaluateGovernanceBypassRequest(r, bucket, object)
if err := s3a.enforceObjectLockProtections(r, bucket, object, "", governanceBypassAllowed); err != nil {
glog.V(2).Infof("PutObjectHandler: object lock permissions check failed for %s/%s: %v", bucket, object, err)
@@ -127,8 +130,8 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
- if versioningEnabled {
- // Handle versioned PUT
+ if versioningState == s3_constants.VersioningEnabled {
+ // Handle enabled versioning - create new versions with real version IDs
glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
@@ -143,8 +146,22 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response
setEtag(w, etag)
+ } else if versioningState == s3_constants.VersioningSuspended {
+ // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
+ glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object)
+ etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
+
+ // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec
+ // The object is stored with "null" version internally but no version header is returned
+
+ // Set ETag in response
+ setEtag(w, etag)
} else {
- // Handle regular PUT (non-versioned)
+ // Handle regular PUT (never configured versioning)
glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
uploadUrl := s3a.toFilerUrl(bucket, object)
if objectContentType == "" {
@@ -158,6 +175,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
+ // No version ID header for never-configured versioning
setEtag(w, etag)
}
}
@@ -274,6 +292,133 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
// putVersionedObject handles PUT operations for versioned buckets using the new layout
// where all versions (including latest) are stored in the .versions directory
+func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
+ // For suspended versioning, store as regular object (version ID "null") but preserve existing versions
+ glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object)
+
+ uploadUrl := s3a.toFilerUrl(bucket, object)
+ if objectContentType == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
+
+ etag, errCode = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+ if errCode != s3err.ErrNone {
+ glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
+ return "", errCode
+ }
+
+ // Get the uploaded entry to add version metadata indicating this is "null" version
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ entry, err := s3a.getEntry(bucketDir, object)
+ if err != nil {
+ glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err)
+ return "", s3err.ErrInternalError
+ }
+
+ // Add metadata to indicate this is a "null" version for suspended versioning
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
+
+ // Extract and store object lock metadata from request headers (if any)
+ if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
+ glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
+ return "", s3err.ErrInvalidRequest
+ }
+
+ // Update the entry with metadata
+ err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = entry.Extended
+ updatedEntry.Attributes = entry.Attributes
+ updatedEntry.Chunks = entry.Chunks
+ })
+ if err != nil {
+ glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err)
+ return "", s3err.ErrInternalError
+ }
+
+ // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
+ err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object)
+ if err != nil {
+ glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
+ // Don't fail the request, but log the warning
+ }
+
+ glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
+ return etag, s3err.ErrNone
+}
+
+// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
+// when a new "null" version becomes the latest during suspended versioning
+func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object string) error {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ cleanObject := strings.TrimPrefix(object, "/")
+ versionsObjectPath := cleanObject + ".versions"
+ versionsDir := bucketDir + "/" + versionsObjectPath
+
+ glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: updating flags for %s/%s", bucket, cleanObject)
+
+ // Check if .versions directory exists
+ _, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ // No .versions directory exists, nothing to update
+ glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: no .versions directory for %s/%s", bucket, cleanObject)
+ return nil
+ }
+
+ // List all entries in .versions directory
+ entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
+ if err != nil {
+ return fmt.Errorf("failed to list versions directory: %v", err)
+ }
+
+ glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: found %d entries to update", len(entries))
+
+ // Update each version/delete marker to set IsLatest=false
+ for _, entry := range entries {
+ if entry.Extended == nil {
+ continue
+ }
+
+ // Check if this entry has a version ID (it should be a version or delete marker)
+ versionIdBytes, hasVersionId := entry.Extended[s3_constants.ExtVersionIdKey]
+ if !hasVersionId {
+ continue
+ }
+
+ versionId := string(versionIdBytes)
+ glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: setting IsLatest=false for version %s", versionId)
+
+ // Update the entry to set IsLatest=false (we don't explicitly store this flag,
+ // it's determined by comparison with latest version metadata)
+ // We need to clear the latest version metadata from the .versions directory
+ // so that our getObjectVersionList function will correctly show IsLatest=false
+ }
+
+ // Clear the latest version metadata from .versions directory since "null" is now latest
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err == nil && versionsEntry.Extended != nil {
+ // Remove latest version metadata so all versions show IsLatest=false
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionIdKey)
+ delete(versionsEntry.Extended, s3_constants.ExtLatestVersionFileNameKey)
+
+ // Update the .versions directory entry
+ err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionsEntry.Extended
+ updatedEntry.Attributes = versionsEntry.Attributes
+ updatedEntry.Chunks = versionsEntry.Chunks
+ })
+ if err != nil {
+ return fmt.Errorf("failed to update .versions directory metadata: %v", err)
+ }
+
+ glog.V(2).Infof("updateIsLatestFlagsForSuspendedVersioning: cleared latest version metadata for %s/%s", bucket, cleanObject)
+ }
+
+ return nil
+}
+
func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
// Generate version ID
versionId = generateVersionId()