aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go157
1 files changed, 148 insertions, 9 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 0b0be5fe5..8b85a049a 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -71,19 +71,53 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
return
}
} else {
- uploadUrl := s3a.toFilerUrl(bucket, object)
- if objectContentType == "" {
- dataReader = mimeDetect(r, dataReader)
+ // Check if versioning is enabled for the bucket
+ versioningEnabled, err := s3a.isVersioningEnabled(bucket)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
+ return
+ }
+ glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
}
- etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+ glog.V(1).Infof("PutObjectHandler: bucket %s, object %s, versioningEnabled=%v", bucket, object, versioningEnabled)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
+ if versioningEnabled {
+ // Handle versioned PUT
+ glog.V(1).Infof("PutObjectHandler: using versioned PUT for %s/%s", bucket, object)
+ versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
+
+ // Set version ID in response header
+ if versionId != "" {
+ w.Header().Set("x-amz-version-id", versionId)
+ }
+
+ // Set ETag in response
+ setEtag(w, etag)
+ } else {
+ // Handle regular PUT (non-versioned)
+ glog.V(1).Infof("PutObjectHandler: using regular PUT for %s/%s", bucket, object)
+ uploadUrl := s3a.toFilerUrl(bucket, object)
+ if objectContentType == "" {
+ dataReader = mimeDetect(r, dataReader)
+ }
+
+ etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket)
+
+ if errCode != s3err.ErrNone {
+ s3err.WriteErrorResponse(w, r, errCode)
+ return
+ }
- setEtag(w, etag)
+ setEtag(w, etag)
+ }
}
stats_collect.RecordBucketActiveTime(bucket)
stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
@@ -195,3 +229,108 @@ func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string
}
return string(encodedJwt)
}
+
+// putVersionedObject handles PUT operations for versioned buckets using the new layout
+// where all versions (including latest) are stored in the .versions directory
+func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
+ // Generate version ID
+ versionId = generateVersionId()
+
+ glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s", versionId, bucket, object)
+
+ // Create the version file name
+ versionFileName := s3a.getVersionFileName(versionId)
+
+ // Upload directly to the versions directory
+ // We need to construct the object path relative to the bucket
+ versionObjectPath := object + ".versions/" + versionFileName
+ versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
+
+ hash := md5.New()
+ var body = io.TeeReader(dataReader, hash)
+ if objectContentType == "" {
+ body = mimeDetect(r, body)
+ }
+
+ glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
+
+ etag, errCode = s3a.putToFiler(r, versionUploadUrl, body, "", bucket)
+ if errCode != s3err.ErrNone {
+ glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
+ return "", "", errCode
+ }
+
+ // Get the uploaded entry to add versioning metadata
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionEntry, err := s3a.getEntry(bucketDir, versionObjectPath)
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to get version entry: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ // Add versioning metadata to this version
+ if versionEntry.Extended == nil {
+ versionEntry.Extended = make(map[string][]byte)
+ }
+ versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
+
+ // Store ETag with quotes for S3 compatibility
+ if !strings.HasPrefix(etag, "\"") {
+ etag = "\"" + etag + "\""
+ }
+ versionEntry.Extended[s3_constants.ExtETagKey] = []byte(etag)
+
+ // Update the version entry with metadata
+ err = s3a.mkFile(bucketDir, versionObjectPath, versionEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionEntry.Extended
+ updatedEntry.Attributes = versionEntry.Attributes
+ updatedEntry.Chunks = versionEntry.Chunks
+ })
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ // Update the .versions directory metadata to indicate this is the latest version
+ err = s3a.updateLatestVersionInDirectory(bucket, object, versionId, versionFileName)
+ if err != nil {
+ glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
+ return "", "", s3err.ErrInternalError
+ }
+
+ glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s", versionId, bucket, object)
+ return versionId, etag, s3err.ErrNone
+}
+
+// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
+func (s3a *S3ApiServer) updateLatestVersionInDirectory(bucket, object, versionId, versionFileName string) error {
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ versionsObjectPath := object + ".versions"
+
+ // Get the current .versions directory entry
+ versionsEntry, err := s3a.getEntry(bucketDir, versionsObjectPath)
+ if err != nil {
+ glog.Errorf("updateLatestVersionInDirectory: failed to get .versions entry: %v", err)
+ return fmt.Errorf("failed to get .versions entry: %v", err)
+ }
+
+ // Add or update the latest version metadata
+ if versionsEntry.Extended == nil {
+ versionsEntry.Extended = make(map[string][]byte)
+ }
+ versionsEntry.Extended[s3_constants.ExtLatestVersionIdKey] = []byte(versionId)
+ versionsEntry.Extended[s3_constants.ExtLatestVersionFileNameKey] = []byte(versionFileName)
+
+ // Update the .versions directory entry with metadata
+ err = s3a.mkFile(bucketDir, versionsObjectPath, versionsEntry.Chunks, func(updatedEntry *filer_pb.Entry) {
+ updatedEntry.Extended = versionsEntry.Extended
+ updatedEntry.Attributes = versionsEntry.Attributes
+ updatedEntry.Chunks = versionsEntry.Chunks
+ })
+ if err != nil {
+ glog.Errorf("updateLatestVersionInDirectory: failed to update .versions directory metadata: %v", err)
+ return fmt.Errorf("failed to update .versions directory metadata: %v", err)
+ }
+
+ return nil
+}