diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-11-10 20:30:21 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-10 20:30:21 -0800 |
| commit | bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3 (patch) | |
| tree | eb42645861f4457411259e86f915cfa514cb192f /weed/s3api/s3api_object_handlers.go | |
| parent | 6201cd099e55e4317f4a34291800a31c145bb803 (diff) | |
| download | seaweedfs-bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3.tar.xz seaweedfs-bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3.zip | |
S3: Perf related (#7463)
* reduce checks
* s3 object lookup optimization
* Only check versioning configuration if client requests
* Consolidate SSE Entry Lookups
* optimize
* revert optimization for versioned objects
* Removed: getObjectEntryForSSE() function
* refactor
* Refactoring: Added fetchObjectEntryRequired
* avoid refetching
* return early if not found
* reuse objects from conditional check
* clear cache when creating bucket
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 144 |
1 files changed, 87 insertions, 57 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 8917393be..9d3b3dfc5 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -264,6 +264,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) versionId := r.URL.Query().Get("versionId") // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { @@ -344,31 +346,47 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) destUrl = s3a.toFilerUrl(bucket, object) } - // Check if this is a range request to an SSE object and modify the approach + // Fetch the correct entry for SSE processing (respects versionId) + // This consolidates entry lookups to avoid multiple filer calls + var objectEntryForSSE *filer_pb.Entry originalRangeHeader := r.Header.Get("Range") var sseObject = false - // Pre-check if this object is SSE encrypted to avoid filer range conflicts - if originalRangeHeader != "" { - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - if objectEntry, err := s3a.getEntry("", objectPath); err == nil { - primarySSEType := s3a.detectPrimarySSEType(objectEntry) - if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { - sseObject = true - // Temporarily remove Range header to get full encrypted data from filer - r.Header.Del("Range") - + if versioningConfigured { + // For versioned objects, reuse the already-fetched entry + objectEntryForSSE = entry + } else { + // For non-versioned objects, try to reuse entry from conditional header check + if result.Entry != nil { + // Reuse entry fetched during conditional header check (optimization) + objectEntryForSSE = result.Entry + glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) + } else { + // No conditional headers were checked, fetch entry for SSE processing + var fetchErr error + objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) + if fetchErr != nil { + glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + if objectEntryForSSE == nil { + // Not found, return error early to avoid another lookup in proxyToFiler + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } } } - // Fetch the correct entry for SSE processing (respects versionId) - objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry) - if err != nil { - glog.Errorf("GetObjectHandler: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // Check if this is an SSE object for Range request handling + // This applies to both versioned and non-versioned objects + if originalRangeHeader != "" && objectEntryForSSE != nil { + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { + sseObject = true + // Temporarily remove Range header to get full encrypted data from filer + r.Header.Del("Range") + } } s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { @@ -415,6 +433,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request versionId := r.URL.Query().Get("versionId") // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { @@ -494,11 +514,31 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } // Fetch the correct entry for SSE processing (respects versionId) - objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry) - if err != nil { - glog.Errorf("HeadObjectHandler: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check + var objectEntryForSSE *filer_pb.Entry + if versioningConfigured { + objectEntryForSSE = entry + } else { + // For non-versioned objects, try to reuse entry from conditional header check + if result.Entry != nil { + // Reuse entry fetched during conditional header check (optimization) + objectEntryForSSE = result.Entry + glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) + } else { + // No conditional headers were checked, fetch entry for SSE processing + var fetchErr error + objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) + if fetchErr != nil { + glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + if objectEntryForSSE == nil { + // Not found, return error early to avoid another lookup in proxyToFiler + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } } s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { @@ -658,21 +698,27 @@ func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bod return statusCode, bytesTransferred } -// getObjectEntryForSSE fetches the correct filer entry for SSE processing -// For versioned objects, it reuses the already-fetched entry -// For non-versioned objects, it fetches the entry from the filer -func (s3a *S3ApiServer) getObjectEntryForSSE(r *http.Request, versioningConfigured bool, versionedEntry *filer_pb.Entry) (*filer_pb.Entry, error) { - if versioningConfigured { - // For versioned objects, we already have the correct entry - return versionedEntry, nil +// fetchObjectEntry fetches the filer entry for an object +// Returns nil if not found (not an error), or propagates other errors +func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry, error) { + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + fetchedEntry, fetchErr := s3a.getEntry("", objectPath) + if fetchErr != nil { + if errors.Is(fetchErr, filer_pb.ErrNotFound) { + return nil, nil // Not found is not an error for SSE check + } + return nil, fetchErr // Propagate other errors } + return fetchedEntry, nil +} - // For non-versioned objects, fetch the entry - bucket, object := s3_constants.GetBucketAndObject(r) +// fetchObjectEntryRequired fetches the filer entry for an object +// Returns an error if the object is not found or any other error occurs +func (s3a *S3ApiServer) fetchObjectEntryRequired(bucket, object string) (*filer_pb.Entry, error) { objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - fetchedEntry, err := s3a.getEntry("", objectPath) - if err != nil && !errors.Is(err, filer_pb.ErrNotFound) { - return nil, fmt.Errorf("failed to get entry for SSE check %s: %w", objectPath, err) + fetchedEntry, fetchErr := s3a.getEntry("", objectPath) + if fetchErr != nil { + return nil, fetchErr // Return error for both not-found and other errors } return fetchedEntry, nil } @@ -750,7 +796,7 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. if sseCChunks >= 1 { // Handle chunked SSE-C objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse) + multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse, entry) if decErr != nil { glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -966,7 +1012,7 @@ func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *htt var decryptedReader io.Reader if isMultipartSSEKMS { // Handle multipart SSE-KMS objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse) + multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry) if decErr != nil { glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -1271,16 +1317,8 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { } // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { - // Get the object path from the request - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - - // Get the object entry from filer to access chunk information - entry, err := s3a.getEntry("", objectPath) - if err != nil { - return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err) - } +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { + // Entry is passed from caller to avoid redundant filer lookup // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() @@ -1531,22 +1569,14 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts -func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { +func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err) } - // Get the object path from the request - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - - // Get the object entry from filer to access chunk information - entry, err := s3a.getEntry("", objectPath) - if err != nil { - return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err) - } + // Entry is passed from caller to avoid redundant filer lookup // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() |
