diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 141 |
1 files changed, 137 insertions, 4 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 8e5008219..5163a72c2 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -3,14 +3,15 @@ package s3api import ( "bytes" "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "io" "net/http" "net/url" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/mem" @@ -120,7 +121,73 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) return } - destUrl := s3a.toFilerUrl(bucket, object) + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") + + // Check if versioning is enabled for the bucket + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + var destUrl string + + if versioningEnabled { + // Handle versioned GET - all versions are stored in .versions directory + var targetVersionId string + var entry *filer_pb.Entry + + if versionId != "" { + // Request for specific version + glog.V(2).Infof("GetObject: requesting specific version %s for %s/%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + targetVersionId = versionId + } else { + // Request for latest version + glog.V(2).Infof("GetObject: requesting latest version for %s/%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("Failed to get latest version: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + } + + // Check if this is a delete marker + if entry.Extended != nil { + if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + + // All versions are stored in .versions directory + versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) + destUrl = s3a.toFilerUrl(bucket, versionObjectPath) + glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl) + + // Set version ID in response header + w.Header().Set("x-amz-version-id", targetVersionId) + } else { + // Handle regular GET (non-versioned) + destUrl = s3a.toFilerUrl(bucket, object) + } s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } @@ -130,7 +197,73 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) - destUrl := s3a.toFilerUrl(bucket, object) + // Check for specific version ID in query parameters + versionId := r.URL.Query().Get("versionId") + + // Check if versioning is enabled for the bucket + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if err == filer_pb.ErrNotFound { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + return + } + glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + var destUrl string + + if versioningEnabled { + // Handle versioned HEAD - all versions are stored in .versions directory + var targetVersionId string + var entry *filer_pb.Entry + + if versionId != "" { + // Request for specific version + glog.V(2).Infof("HeadObject: requesting specific version %s for %s/%s", versionId, bucket, object) + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + glog.Errorf("Failed to get specific version %s: %v", versionId, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + targetVersionId = versionId + } else { + // Request for latest version + glog.V(2).Infof("HeadObject: requesting latest version for %s/%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("Failed to get latest version: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + } + + // Check if this is a delete marker + if entry.Extended != nil { + if deleteMarker, exists := entry.Extended[s3_constants.ExtDeleteMarkerKey]; exists && string(deleteMarker) == "true" { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + + // All versions are stored in .versions directory + versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) + destUrl = s3a.toFilerUrl(bucket, versionObjectPath) + glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl) + + // Set version ID in response header + w.Header().Set("x-amz-version-id", targetVersionId) + } else { + // Handle regular HEAD (non-versioned) + destUrl = s3a.toFilerUrl(bucket, object) + } s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } |
