diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 284 |
1 files changed, 225 insertions, 59 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6a846120a..fb7d6c3a6 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -65,12 +65,6 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html bucket, object := s3_constants.GetBucketAndObject(r) - authHeader := r.Header.Get("Authorization") - authPreview := authHeader - if len(authHeader) > 50 { - authPreview = authHeader[:50] + "..." - } - glog.V(0).Infof("PutObjectHandler: Starting PUT %s/%s (Auth: %s)", bucket, object, authPreview) glog.V(3).Infof("PutObjectHandler %s %s", bucket, object) _, err := validateContentMd5(r.Header) @@ -141,7 +135,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) versioningEnabled := (versioningState == s3_constants.VersioningEnabled) versioningConfigured := (versioningState != "") - glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningState=%s", bucket, object, versioningState) + glog.V(0).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) // Validate object lock headers before processing if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { @@ -163,37 +157,41 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) if versioningState == s3_constants.VersioningEnabled { // Handle enabled versioning - create new versions with real version IDs - glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object) + glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { + glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) s3err.WriteErrorResponse(w, r, errCode) return } + glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) + // Set version ID in response header if versionId != "" { w.Header().Set("x-amz-version-id", versionId) + glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) + } else { + glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) } // Set ETag in response setEtag(w, etag) } else if versioningState == s3_constants.VersioningSuspended { // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions - glog.V(1).Infof("PutObjectHandler: using suspended versioning PUT for %s/%s", bucket, object) etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } - // Note: Suspended versioning should NOT return x-amz-version-id header according to AWS S3 spec + // Note: Suspended versioning should NOT return x-amz-version-id header per AWS S3 spec // The object is stored with "null" version internally but no version header is returned // Set ETag in response setEtag(w, etag) } else { // Handle regular PUT (never configured versioning) - glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object) uploadUrl := s3a.toFilerUrl(bucket, object) if objectContentType == "" { dataReader = mimeDetect(r, dataReader) @@ -298,6 +296,11 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader } } + // Log version ID header for debugging + if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { + glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) + } + // Set object owner header for filer to extract amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { @@ -427,65 +430,186 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ } } -// putVersionedObject handles PUT operations for versioned buckets using the new layout -// where all versions (including latest) are stored in the .versions directory +// putSuspendedVersioningObject handles PUT operations for buckets with suspended versioning. +// +// Key architectural approach: +// Instead of creating the file and then updating its metadata (which can cause race conditions and duplicate versions), +// we set all required metadata as HTTP headers BEFORE calling putToFiler. The filer automatically stores any header +// starting with "Seaweed-" in entry.Extended during file creation, ensuring atomic metadata persistence. +// +// This approach eliminates: +// - Race conditions from read-after-write consistency delays +// - Need for retry loops and exponential backoff +// - Duplicate entries from separate create/update operations +// +// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, +// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { - // For suspended versioning, store as regular object (version ID "null") but preserve existing versions - glog.V(2).Infof("putSuspendedVersioningObject: creating null version for %s/%s", bucket, object) + // Normalize object path to ensure consistency with toFilerUrl behavior + normalizedObject := removeDuplicateSlashes(object) + + // Enable detailed logging for testobjbar + isTestObj := (normalizedObject == "testobjbar") + + glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", + bucket, object, normalizedObject, isTestObj) - uploadUrl := s3a.toFilerUrl(bucket, object) + if isTestObj { + glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") + } + + bucketDir := s3a.option.BucketsPath + "/" + bucket + + // Check if there's an existing null version in .versions directory and delete it + // This ensures suspended versioning properly overwrites the null version as per S3 spec + // Note: We only delete null versions, NOT regular versions (those should be preserved) + versionsObjectPath := normalizedObject + ".versions" + versionsDir := bucketDir + "/" + versionsObjectPath + entries, _, err := s3a.list(versionsDir, "", "", false, 1000) + if err == nil { + // .versions directory exists + glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) + for _, entry := range entries { + if entry.Extended != nil { + if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { + versionId := string(versionIdBytes) + glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) + if versionId == "null" { + // Only delete null version - preserve real versioned entries + glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions") + err := s3a.rm(versionsDir, entry.Name, true, false) + if err != nil { + glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) + } else { + glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version") + } + break + } + } + } + } + } else { + glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) + } + + uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) + + hash := md5.New() + var body = io.TeeReader(dataReader, hash) if objectContentType == "" { - dataReader = mimeDetect(r, dataReader) + body = mimeDetect(r, body) } - etag, errCode, _ = s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1) - if errCode != s3err.ErrNone { - glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) - return "", errCode + // Set all metadata headers BEFORE calling putToFiler + // This ensures the metadata is set during file creation, not after + // The filer automatically stores any header starting with "Seaweed-" in entry.Extended + + // Set version ID to "null" for suspended versioning + r.Header.Set(s3_constants.ExtVersionIdKey, "null") + if isTestObj { + glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", + s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) } - // Get the uploaded entry to add version metadata indicating this is "null" version - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err := s3a.getEntry(bucketDir, object) - if err != nil { - glog.Errorf("putSuspendedVersioningObject: failed to get object entry: %v", err) - return "", s3err.ErrInternalError + // Extract and set object lock metadata as headers + // This handles retention mode, retention date, and legal hold + explicitMode := r.Header.Get(s3_constants.AmzObjectLockMode) + explicitRetainUntilDate := r.Header.Get(s3_constants.AmzObjectLockRetainUntilDate) + + if explicitMode != "" { + r.Header.Set(s3_constants.ExtObjectLockModeKey, explicitMode) + glog.V(2).Infof("putSuspendedVersioningObject: setting object lock mode header: %s", explicitMode) } - // Add metadata to indicate this is a "null" version for suspended versioning - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) + if explicitRetainUntilDate != "" { + // Parse and convert to Unix timestamp + parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) + if err != nil { + glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) + return "", s3err.ErrInvalidRequest + } + r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) + glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) } - entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") - // Set object owner for suspended versioning objects - s3a.setObjectOwnerFromRequest(r, entry) + if legalHold := r.Header.Get(s3_constants.AmzObjectLockLegalHold); legalHold != "" { + if legalHold == s3_constants.LegalHoldOn || legalHold == s3_constants.LegalHoldOff { + r.Header.Set(s3_constants.ExtLegalHoldKey, legalHold) + glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) + } else { + glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) + return "", s3err.ErrInvalidRequest + } + } - // Extract and store object lock metadata from request headers (if any) - if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil { - glog.Errorf("putSuspendedVersioningObject: failed to extract object lock metadata: %v", err) - return "", s3err.ErrInvalidRequest + // Apply bucket default retention if no explicit retention was provided + if explicitMode == "" && explicitRetainUntilDate == "" { + // Create a temporary entry to apply defaults + tempEntry := &filer_pb.Entry{Extended: make(map[string][]byte)} + if err := s3a.applyBucketDefaultRetention(bucket, tempEntry); err == nil { + // Copy default retention headers from temp entry + if modeBytes, ok := tempEntry.Extended[s3_constants.ExtObjectLockModeKey]; ok { + r.Header.Set(s3_constants.ExtObjectLockModeKey, string(modeBytes)) + glog.V(2).Infof("putSuspendedVersioningObject: applied bucket default retention mode: %s", string(modeBytes)) + } + if dateBytes, ok := tempEntry.Extended[s3_constants.ExtRetentionUntilDateKey]; ok { + r.Header.Set(s3_constants.ExtRetentionUntilDateKey, string(dateBytes)) + glog.V(2).Infof("putSuspendedVersioningObject: applied bucket default retention date") + } + } } - // Update the entry with metadata - err = s3a.mkFile(bucketDir, object, entry.Chunks, func(updatedEntry *filer_pb.Entry) { - updatedEntry.Extended = entry.Extended - updatedEntry.Attributes = entry.Attributes - updatedEntry.Chunks = entry.Chunks - }) - if err != nil { - glog.Errorf("putSuspendedVersioningObject: failed to update object metadata: %v", err) - return "", s3err.ErrInternalError + // Upload the file using putToFiler - this will create the file with version metadata + if isTestObj { + glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") + } + etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) + if errCode != s3err.ErrNone { + glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) + return "", errCode + } + if isTestObj { + glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) + } + + // Verify the metadata was set correctly during file creation + if isTestObj { + // Read back the entry to verify + maxRetries := 3 + for attempt := 1; attempt <= maxRetries; attempt++ { + verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) + if verifyErr == nil { + glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) + if verifyEntry.Extended != nil { + if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { + glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) + } else { + glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") + } + } else { + glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") + } + break + } else { + glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) + } + if attempt < maxRetries { + time.Sleep(time.Millisecond * 10) + } + } } // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest - err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, object) + err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject) if err != nil { glog.Warningf("putSuspendedVersioningObject: failed to update IsLatest flags: %v", err) // Don't fail the request, but log the warning } glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) + if isTestObj { + glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") + } return etag, s3err.ErrNone } @@ -562,16 +686,30 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Generate version ID versionId = generateVersionId() - glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object) + // Normalize object path to ensure consistency with toFilerUrl behavior + normalizedObject := removeDuplicateSlashes(object) + + glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) // Create the version file name versionFileName := s3a.getVersionFileName(versionId) // Upload directly to the versions directory // We need to construct the object path relative to the bucket - versionObjectPath := object + ".versions/" + versionFileName + versionObjectPath := normalizedObject + ".versions/" + versionFileName versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath) + // Ensure the .versions directory exists before uploading + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsDir := normalizedObject + ".versions" + err := s3a.mkdir(bucketDir, versionsDir, func(entry *filer_pb.Entry) { + entry.Attributes.Mime = s3_constants.FolderMimeType + }) + if err != nil { + glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) + return "", "", s3err.ErrInternalError + } + hash := md5.New() var body = io.TeeReader(dataReader, hash) if objectContentType == "" { @@ -587,10 +725,24 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin } // Get the uploaded entry to add versioning metadata - bucketDir := s3a.option.BucketsPath + "/" + bucket - versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath) + // Use retry logic to handle filer consistency delays + var versionEntry *filer_pb.Entry + maxRetries := 8 + for attempt := 1; attempt <= maxRetries; attempt++ { + versionEntry, err = s3a.getEntry(bucketDir, versionObjectPath) + if err == nil { + break + } + + if attempt < maxRetries { + // Exponential backoff: 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms + delay := time.Millisecond * time.Duration(10*(1<<(attempt-1))) + time.Sleep(delay) + } + } + if err != nil { - glog.Errorf("putVersionedObject: failed to get version entry: %v", err) + glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) return "", "", s3err.ErrInternalError } @@ -627,13 +779,12 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin } // Update the .versions directory metadata to indicate this is the latest version - err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName) + err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) if err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) return "", "", s3err.ErrInternalError } - - glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object) + glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) return versionId, etag, s3err.ErrNone } @@ -642,11 +793,26 @@ func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId bucketDir := s3a.option.BucketsPath + "/" + bucket versionsObjectPath := object + ".versions" - // Get the current .versions directory entry - versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + // Get the current .versions directory entry with retry logic for filer consistency + var versionsEntry *filer_pb.Entry + var err error + maxRetries := 8 + for attempt := 1; attempt <= maxRetries; attempt++ { + versionsEntry, err = s3a.getEntry(bucketDir, versionsObjectPath) + if err == nil { + break + } + + if attempt < maxRetries { + // Exponential backoff with higher base: 100ms, 200ms, 400ms, 800ms, 1600ms, 3200ms, 6400ms + delay := time.Millisecond * time.Duration(100*(1<<(attempt-1))) + time.Sleep(delay) + } + } + if err != nil { - glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err) - return fmt.Errorf("failed to get .versions entry: %w", err) + glog.Errorf("updateLatestVersionInDirectory: failed to get .versions directory for %s/%s after %d attempts: %v", bucket, object, maxRetries, err) + return fmt.Errorf("failed to get .versions directory after %d attempts: %w", maxRetries, err) } // Add or update the latest version metadata |
