diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 157 |
1 files changed, 148 insertions, 9 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 0b0be5fe5..8b85a049a 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -71,19 +71,53 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) return } } else { - uploadUrl := s3a.toFilerUrl(bucket, object) - if objectContentType == "" { - dataReader = mimeDetect(r, dataReader) + // Check if versioning is enabled for the bucket + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return } - etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) + glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled) - if errCode != s3err.ErrNone { - s3err.WriteErrorResponse(w, r, errCode) - return - } + if versioningEnabled { + // Handle versioned PUT + glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object) + versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } + + // Set version ID in response header + if versionId != "" { + w.Header().Set("x-amz-version-id", versionId) + } + + // Set ETag in response + setEtag(w, etag) + } else { + // Handle regular PUT (non-versioned) + glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object) + uploadUrl := s3a.toFilerUrl(bucket, object) + if objectContentType == "" { + dataReader = mimeDetect(r, dataReader) + } + + etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket) + + if errCode != s3err.ErrNone { + s3err.WriteErrorResponse(w, r, errCode) + return + } - setEtag(w, etag) + setEtag(w, etag) + } } stats_collect.RecordBucketActiveTime(bucket) stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc() @@ -195,3 +229,108 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string } return string(encodedJwt) } + +// putVersionedObject handles PUT operations for versioned buckets using the new layout +// where all versions (including latest) are stored in the .versions directory +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { + // Generate version ID + versionId = generateVersionId() + + glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object) + + // Create the version file name + versionFileName := s3a.getVersionFileName(versionId) + + // Upload directly to the versions directory + // We need to construct the object path relative to the bucket + versionObjectPath := object + ".versions/" + versionFileName + versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath) + + hash := md5.New() + var body = io.TeeReader(dataReader, hash) + if objectContentType == "" { + body = mimeDetect(r, body) + } + + glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) + + etag, errCode = s3a.putToFiler(r, versionUploadUrl, body, "", bucket) + if errCode != s3err.ErrNone { + glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) + return "", "", errCode + } + + // Get the uploaded entry to add versioning metadata + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath) + if err != nil { + glog.Errorf("putVersionedObject: failed to get version entry: %v", err) + return "", "", s3err.ErrInternalError + } + + // Add versioning metadata to this version + if versionEntry.Extended == nil { + versionEntry.Extended = make(map[string][]byte) + } + versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) + + // Store ETag with quotes for S3 compatibility + if !strings.HasPrefix(etag, "\"") { + etag = "\"" + etag + "\"" + } + versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag) + + // Update the version entry with metadata + err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionEntry.Extended + updatedEntry.Attributes = versionEntry.Attributes + updatedEntry.Chunks = versionEntry.Chunks + }) + if err != nil { + glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) + return "", "", s3err.ErrInternalError + } + + // Update the .versions directory metadata to indicate this is the latest version + err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName) + if err != nil { + glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) + return "", "", s3err.ErrInternalError + } + + glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object) + return versionId, etag, s3err.ErrNone +} + +// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version +func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error { + bucketDir := s3a.option.BucketsPath + "/" + bucket + versionsObjectPath := object + ".versions" + + // Get the current .versions directory entry + versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath) + if err != nil { + glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err) + return fmt.Errorf("failed to get .versions entry: %v", err) + } + + // Add or update the latest version metadata + if versionsEntry.Extended == nil { + versionsEntry.Extended = make(map[string][]byte) + } + versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId) + versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName) + + // Update the .versions directory entry with metadata + err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = versionsEntry.Extended + updatedEntry.Attributes = versionsEntry.Attributes + updatedEntry.Chunks = versionsEntry.Chunks + }) + if err != nil { + glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err) + return fmt.Errorf("failed to update .versions directory metadata: %v", err) + } + + return nil +} |
