aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go284
1 files changed, 225 insertions, 59 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 6a846120a..fb7d6c3a6 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -65,12 +65,6 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
bucket, object := s3_constants.GetBucketAndObject(r)
- authHeader := r.Header.Get("Authorization")
- authPreview := authHeader
- if len(authHeader) > 50 {
- authPreview = authHeader[:50] + "..."
- }
- glog.V(0).Infof("PutObjectHandler: Starting PUT %s/%s (Auth: %s)", bucket, object, authPreview)
glog.V(3).Infof("PutObjectHandler %s %s", bucket, object)
_, err := validateContentMd5(r.Header)
@@ -141,7 +135,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningConfigured := (versioningState != "")
- glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState)
+ glog.V(0).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@@ -163,37 +157,41 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
if versioningState == s3_constants.VersioningEnabled {
// Handle enabled versioning - create new versions with real version IDs
- glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
+ glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
+ glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode)
return
}
+ glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
+
// Set version ID in response header
if versionId != "" {
w.Header().Set("x-amz-version-id", versionId)
+ glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
+ } else {
+ glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object)
}
// Set ETag in response
setEtag(w, etag)
} else if versioningState == s3_constants.VersioningSuspended {
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
- glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object)
etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
- // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec
+ // Note: Suspended versioning should NOT return x-amz-version-id header per AWS S3 spec
// The object is stored with "null" version internally but no version header is returned
// Set ETag in response
setEtag(w, etag)
} else {
// Handle regular PUT (never configured versioning)
- glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
uploadUrl := s3a.toFilerUrl(bucket, object)
if objectContentType == "" {
dataReader = mimeDetect(r, dataReader)
@@ -298,6 +296,11 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
}
}
+ // Log version ID header for debugging
+ if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
+ glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl)
+ }
+
// Set object owner header for filer to extract
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
@@ -427,65 +430,186 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
}
}
-// putVersionedObject handles PUT operations for versioned buckets using the new layout
-// where all versions (including latest) are stored in the .versions directory
+// putSuspendedVersioningObject handles PUT operations for buckets with suspended versioning.
+//
+// Key architectural approach:
+// Instead of creating the file and then updating its metadata (which can cause race conditions and duplicate versions),
+// we set all required metadata as HTTP headers BEFORE calling putToFiler. The filer automatically stores any header
+// starting with "Seaweed-" in entry.Extended during file creation, ensuring atomic metadata persistence.
+//
+// This approach eliminates:
+// - Race conditions from read-after-write consistency delays
+// - Need for retry loops and exponential backoff
+// - Duplicate entries from separate create/update operations
+//
+// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
+// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
- // For suspended versioning, store as regular object (version ID "null") but preserve existing versions
- glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object)
+ // Normalize object path to ensure consistency with toFilerUrl behavior
+ normalizedObject := removeDuplicateSlashes(object)
+
+ // Enable detailed logging for testobjbar
+ isTestObj := (normalizedObject == "testobjbar")
+
+ glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
+ bucket, object, normalizedObject, isTestObj)
- uploadUrl := s3a.toFilerUrl(bucket, object)
+ if isTestObj {
+ glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
+ }
+
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+
+ // Check if there's an existing null version in .versions directory and delete it
+ // This ensures suspended versioning properly overwrites the null version as per S3 spec
+ // Note: We only delete null versions, NOT regular versions (those should be preserved)
+ versionsObjectPath := normalizedObject + ".versions"
+ versionsDir := bucketDir + "/" + versionsObjectPath
+ entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
+ if err == nil {
+ // .versions directory exists
+ glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
+ for _, entry := range entries {
+ if entry.Extended != nil {
+ if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
+ versionId := string(versionIdBytes)
+ glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
+ if versionId == "null" {
+ // Only delete null version - preserve real versioned entries
+ glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions")
+ err := s3a.rm(versionsDir, entry.Name, true, false)
+ if err != nil {
+ glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err)
+ } else {
+ glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version")
+ }
+ break
+ }
+ }
+ }
+ }
+ } else {
+ glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
+ }
+
+ uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
+
+ hash := md5.New()
+ var body = io.TeeReader(dataReader, hash)
if objectContentType == "" {
- dataReader = mimeDetect(r, dataReader)
+ body = mimeDetect(r, body)
}
- etag, errCode, _ = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
- if errCode != s3err.ErrNone {
- glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
- return "", errCode
+ // Set all metadata headers BEFORE calling putToFiler
+ // This ensures the metadata is set during file creation, not after
+ // The filer automatically stores any header starting with "Seaweed-" in entry.Extended
+
+ // Set version ID to "null" for suspended versioning
+ r.Header.Set(s3_constants.ExtVersionIdKey, "null")
+ if isTestObj {
+ glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
+ s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey))
}
- // Get the uploaded entry to add version metadata indicating this is "null" version
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- entry, err := s3a.getEntry(bucketDir, object)
- if err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err)
- return "", s3err.ErrInternalError
+ // Extract and set object lock metadata as headers
+ // This handles retention mode, retention date, and legal hold
+ explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode)
+ explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate)
+
+ if explicitMode != "" {
+ r.Header.Set(s3_constants.ExtObjectLockModeKey, explicitMode)
+ glog.V(2).Infof("putSuspendedVersioningObject: setting object lock mode header: %s", explicitMode)
}
- // Add metadata to indicate this is a "null" version for suspended versioning
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
+ if explicitRetainUntilDate != "" {
+ // Parse and convert to Unix timestamp
+ parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
+ if err != nil {
+ glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
+ return "", s3err.ErrInvalidRequest
+ }
+ r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
+ glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
}
- entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
- // Set object owner for suspended versioning objects
- s3a.setObjectOwnerFromRequest(r, entry)
+ if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" {
+ if legalHold == s3_constants.LegalHoldOn || legalHold == s3_constants.LegalHoldOff {
+ r.Header.Set(s3_constants.ExtLegalHoldKey, legalHold)
+ glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
+ } else {
+ glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
+ return "", s3err.ErrInvalidRequest
+ }
+ }
- // Extract and store object lock metadata from request headers (if any)
- if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err)
- return "", s3err.ErrInvalidRequest
+ // Apply bucket default retention if no explicit retention was provided
+ if explicitMode == "" && explicitRetainUntilDate == "" {
+ // Create a temporary entry to apply defaults
+ tempEntry := &filer_pb.Entry{Extended: make(map[string][]byte)}
+ if err := s3a.applyBucketDefaultRetention(bucket, tempEntry); err == nil {
+ // Copy default retention headers from temp entry
+ if modeBytes, ok := tempEntry.Extended[s3_constants.ExtObjectLockModeKey]; ok {
+ r.Header.Set(s3_constants.ExtObjectLockModeKey, string(modeBytes))
+ glog.V(2).Infof("putSuspendedVersioningObject: applied bucket default retention mode: %s", string(modeBytes))
+ }
+ if dateBytes, ok := tempEntry.Extended[s3_constants.ExtRetentionUntilDateKey]; ok {
+ r.Header.Set(s3_constants.ExtRetentionUntilDateKey, string(dateBytes))
+ glog.V(2).Infof("putSuspendedVersioningObject: applied bucket default retention date")
+ }
+ }
}
- // Update the entry with metadata
- err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
- updatedEntry.Extended = entry.Extended
- updatedEntry.Attributes = entry.Attributes
- updatedEntry.Chunks = entry.Chunks
- })
- if err != nil {
- glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err)
- return "", s3err.ErrInternalError
+ // Upload the file using putToFiler - this will create the file with version metadata
+ if isTestObj {
+ glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===")
+ }
+ etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
+ if errCode != s3err.ErrNone {
+ glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
+ return "", errCode
+ }
+ if isTestObj {
+ glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
+ }
+
+ // Verify the metadata was set correctly during file creation
+ if isTestObj {
+ // Read back the entry to verify
+ maxRetries := 3
+ for attempt := 1; attempt <= maxRetries; attempt++ {
+ verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject)
+ if verifyErr == nil {
+ glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
+ if verifyEntry.Extended != nil {
+ if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok {
+ glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
+ } else {
+ glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
+ }
+ } else {
+ glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
+ }
+ break
+ } else {
+ glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
+ }
+ if attempt < maxRetries {
+ time.Sleep(time.Millisecond * 10)
+ }
+ }
}
// Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
- err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object)
+ err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject)
if err != nil {
glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err)
// Don't fail the request, but log the warning
}
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
+ if isTestObj {
+ glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
+ }
return etag, s3err.ErrNone
}
@@ -562,16 +686,30 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Generate version ID
versionId = generateVersionId()
- glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
+ // Normalize object path to ensure consistency with toFilerUrl behavior
+ normalizedObject := removeDuplicateSlashes(object)
+
+ glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
// Create the version file name
versionFileName := s3a.getVersionFileName(versionId)
// Upload directly to the versions directory
// We need to construct the object path relative to the bucket
- versionObjectPath := object + ".versions/" + versionFileName
+ versionObjectPath := normalizedObject + ".versions/" + versionFileName
versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
+ // Ensure the .versions directory exists before uploading
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsDir := normalizedObject + ".versions"
+ err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) {
+ entry.Attributes.Mime = s3_constants.FolderMimeType
+ })
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
if objectContentType == "" {
@@ -587,10 +725,24 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
}
// Get the uploaded entry to add versioning metadata
- bucketDir := s3a.option.BucketsPath + "/" + bucket
- versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
+ // Use retry logic to handle filer consistency delays
+ var versionEntry *filer_pb.Entry
+ maxRetries := 8
+ for attempt := 1; attempt <= maxRetries; attempt++ {
+ versionEntry, err = s3a.getEntry(bucketDir, versionObjectPath)
+ if err == nil {
+ break
+ }
+
+ if attempt < maxRetries {
+ // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms
+ delay := time.Millisecond * time.Duration(10*(1<<(attempt-1)))
+ time.Sleep(delay)
+ }
+ }
+
if err != nil {
- glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
+ glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
return "", "", s3err.ErrInternalError
}
@@ -627,13 +779,12 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
}
// Update the .versions directory metadata to indicate this is the latest version
- err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
+ err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
return "", "", s3err.ErrInternalError
}
-
- glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
+ glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
return versionId, etag, s3err.ErrNone
}
@@ -642,11 +793,26 @@ func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId
bucketDir := s3a.option.BucketsPath + "/" + bucket
versionsObjectPath := object + ".versions"
- // Get the current .versions directory entry
- versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ // Get the current .versions directory entry with retry logic for filer consistency
+ var versionsEntry *filer_pb.Entry
+ var err error
+ maxRetries := 8
+ for attempt := 1; attempt <= maxRetries; attempt++ {
+ versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath)
+ if err == nil {
+ break
+ }
+
+ if attempt < maxRetries {
+ // Exponential backoff with higher base: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 6400ms
+ delay := time.Millisecond * time.Duration(100*(1<<(attempt-1)))
+ time.Sleep(delay)
+ }
+ }
+
if err != nil {
- glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
- return fmt.Errorf("failed to get .versions entry: %w", err)
+ glog.Errorf("updateLatestVersionInDirectory: failed to get .versions directory for %s/%s after %d attempts: %v", bucket, object, maxRetries, err)
+ return fmt.Errorf("failed to get .versions directory after %d attempts: %w", maxRetries, err)
}
// Add or update the latest version metadata