aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-11-10 20:30:21 -0800
committerGitHub <noreply@github.com>2025-11-10 20:30:21 -0800
commitbf8e4f40e60e74ce03c2f497c6245e5d1460f1d3 (patch)
treeeb42645861f4457411259e86f915cfa514cb192f /weed/s3api/s3api_object_handlers.go
parent6201cd099e55e4317f4a34291800a31c145bb803 (diff)
downloadseaweedfs-bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3.tar.xz
seaweedfs-bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3.zip
S3: Perf related (#7463)
* reduce checks * s3 object lookup optimization * Only check versioning configuration if client requests * Consolidate SSE Entry Lookups * optimize * revert optimization for versioned objects * Removed: getObjectEntryForSSE() function * refactor * Refactoring: Added fetchObjectEntryRequired * avoid refetching * return early if not found * reuse objects from conditional check * clear cache when creating bucket
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
-rw-r--r--weed/s3api/s3api_object_handlers.go144
1 files changed, 87 insertions, 57 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 8917393be..9d3b3dfc5 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -264,6 +264,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
+ // Note: We need to check this even if versionId is empty, because versioned buckets
+ // handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
@@ -344,31 +346,47 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
destUrl = s3a.toFilerUrl(bucket, object)
}
- // Check if this is a range request to an SSE object and modify the approach
+ // Fetch the correct entry for SSE processing (respects versionId)
+ // This consolidates entry lookups to avoid multiple filer calls
+ var objectEntryForSSE *filer_pb.Entry
originalRangeHeader := r.Header.Get("Range")
var sseObject = false
- // Pre-check if this object is SSE encrypted to avoid filer range conflicts
- if originalRangeHeader != "" {
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
- primarySSEType := s3a.detectPrimarySSEType(objectEntry)
- if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
- sseObject = true
- // Temporarily remove Range header to get full encrypted data from filer
- r.Header.Del("Range")
-
+ if versioningConfigured {
+ // For versioned objects, reuse the already-fetched entry
+ objectEntryForSSE = entry
+ } else {
+ // For non-versioned objects, try to reuse entry from conditional header check
+ if result.Entry != nil {
+ // Reuse entry fetched during conditional header check (optimization)
+ objectEntryForSSE = result.Entry
+ glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
+ } else {
+ // No conditional headers were checked, fetch entry for SSE processing
+ var fetchErr error
+ objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
+ if fetchErr != nil {
+ glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ if objectEntryForSSE == nil {
+ // Not found, return error early to avoid another lookup in proxyToFiler
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
}
}
}
- // Fetch the correct entry for SSE processing (respects versionId)
- objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
- if err != nil {
- glog.Errorf("GetObjectHandler: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
+ // Check if this is an SSE object for Range request handling
+ // This applies to both versioned and non-versioned objects
+ if originalRangeHeader != "" && objectEntryForSSE != nil {
+ primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
+ if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
+ sseObject = true
+ // Temporarily remove Range header to get full encrypted data from filer
+ r.Header.Del("Range")
+ }
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
@@ -415,6 +433,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
+ // Note: We need to check this even if versionId is empty, because versioned buckets
+ // handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
@@ -494,11 +514,31 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
// Fetch the correct entry for SSE processing (respects versionId)
- objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
- if err != nil {
- glog.Errorf("HeadObjectHandler: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
+ // For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check
+ var objectEntryForSSE *filer_pb.Entry
+ if versioningConfigured {
+ objectEntryForSSE = entry
+ } else {
+ // For non-versioned objects, try to reuse entry from conditional header check
+ if result.Entry != nil {
+ // Reuse entry fetched during conditional header check (optimization)
+ objectEntryForSSE = result.Entry
+ glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
+ } else {
+ // No conditional headers were checked, fetch entry for SSE processing
+ var fetchErr error
+ objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
+ if fetchErr != nil {
+ glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ if objectEntryForSSE == nil {
+ // Not found, return error early to avoid another lookup in proxyToFiler
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ }
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
@@ -658,21 +698,27 @@ func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bod
return statusCode, bytesTransferred
}
-// getObjectEntryForSSE fetches the correct filer entry for SSE processing
-// For versioned objects, it reuses the already-fetched entry
-// For non-versioned objects, it fetches the entry from the filer
-func (s3a *S3ApiServer) getObjectEntryForSSE(r *http.Request, versioningConfigured bool, versionedEntry *filer_pb.Entry) (*filer_pb.Entry, error) {
- if versioningConfigured {
- // For versioned objects, we already have the correct entry
- return versionedEntry, nil
+// fetchObjectEntry fetches the filer entry for an object
+// Returns nil if not found (not an error), or propagates other errors
+func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry, error) {
+ objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
+ fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
+ if fetchErr != nil {
+ if errors.Is(fetchErr, filer_pb.ErrNotFound) {
+ return nil, nil // Not found is not an error for SSE check
+ }
+ return nil, fetchErr // Propagate other errors
}
+ return fetchedEntry, nil
+}
- // For non-versioned objects, fetch the entry
- bucket, object := s3_constants.GetBucketAndObject(r)
+// fetchObjectEntryRequired fetches the filer entry for an object
+// Returns an error if the object is not found or any other error occurs
+func (s3a *S3ApiServer) fetchObjectEntryRequired(bucket, object string) (*filer_pb.Entry, error) {
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
- fetchedEntry, err := s3a.getEntry("", objectPath)
- if err != nil && !errors.Is(err, filer_pb.ErrNotFound) {
- return nil, fmt.Errorf("failed to get entry for SSE check %s: %w", objectPath, err)
+ fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
+ if fetchErr != nil {
+ return nil, fetchErr // Return error for both not-found and other errors
}
return fetchedEntry, nil
}
@@ -750,7 +796,7 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.
if sseCChunks >= 1 {
// Handle chunked SSE-C objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse)
+ multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse, entry)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -966,7 +1012,7 @@ func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *htt
var decryptedReader io.Reader
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse)
+ multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry)
if decErr != nil {
glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -1271,16 +1317,8 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
}
// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
-func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
- // Get the object path from the request
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
-
- // Get the object entry from filer to access chunk information
- entry, err := s3a.getEntry("", objectPath)
- if err != nil {
- return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err)
- }
+func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
+ // Entry is passed from caller to avoid redundant filer lookup
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
@@ -1531,22 +1569,14 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) {
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
// Each chunk has its own IV and encryption key from the original multipart parts
-func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) {
+func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
// Parse SSE-C headers from the request for decryption key
customerKey, err := ParseSSECHeaders(r)
if err != nil {
return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err)
}
- // Get the object path from the request
- bucket, object := s3_constants.GetBucketAndObject(r)
- objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
-
- // Get the object entry from filer to access chunk information
- entry, err := s3a.getEntry("", objectPath)
- if err != nil {
- return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err)
- }
+ // Entry is passed from caller to avoid redundant filer lookup
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()