aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-11-18 23:18:35 -0800
committerGitHub <noreply@github.com>2025-11-18 23:18:35 -0800
commitca84a8a7131e2be81ead697c472bf967548d97ec (patch)
tree986045b75ce20c2d20d89197bc78bae2fa3cec27 /weed/s3api/s3api_object_handlers_put.go
parent7c8896c3ffa94cc772aabab73fa2da3a6f5398e7 (diff)
downloadseaweedfs-ca84a8a7131e2be81ead697c472bf967548d97ec.tar.xz
seaweedfs-ca84a8a7131e2be81ead697c472bf967548d97ec.zip
S3: Directly read write volume servers (#7481)
* Lazy Versioning Check, Conditional SSE Entry Fetch, HEAD Request Optimization * revert Reverted the conditional versioning check to always check versioning status Reverted the conditional SSE entry fetch to always fetch entry metadata Reverted the conditional versioning check to always check versioning status Reverted the conditional SSE entry fetch to always fetch entry metadata * Lazy Entry Fetch for SSE, Skip Conditional Header Check * SSE-KMS headers are present, this is not an SSE-C request (mutually exclusive) * SSE-C is mutually exclusive with SSE-S3 and SSE-KMS * refactor * Removed Premature Mutual Exclusivity Check * check for the presence of the X-Amz-Server-Side-Encryption header * not used * fmt * directly read write volume servers * HTTP Range Request Support * set header * md5 * copy object * fix sse * fmt * implement sse * sse continue * fixed the suffix range bug (bytes=-N for "last N bytes") * debug logs * Missing PartsCount Header * profiling * url encoding * test_multipart_get_part * headers * debug * adjust log level * handle part number * Update s3api_object_handlers.go * nil safety * set ModifiedTsNs * remove * nil check * fix sse header * same logic as filer * decode values * decode ivBase64 * s3: Fix SSE decryption JWT authentication and streaming errors Critical fix for SSE (Server-Side Encryption) test failures: 1. **JWT Authentication Bug** (Root Cause): - Changed from GenJwtForFilerServer to GenJwtForVolumeServer - S3 API now uses correct JWT when directly reading from volume servers - Matches filer's authentication pattern for direct volume access - Fixes 'unexpected EOF' and 500 errors in SSE tests 2. **Streaming Error Handling**: - Added error propagation in getEncryptedStreamFromVolumes goroutine - Use CloseWithError() to properly communicate stream failures - Added debug logging for streaming errors 3. **Response Header Timing**: - Removed premature WriteHeader(http.StatusOK) call - Let Go's http package write status automatically on first write - Prevents header lock when errors occur during streaming 4. **Enhanced SSE Decryption Debugging**: - Added IV/Key validation and logging for SSE-C, SSE-KMS, SSE-S3 - Better error messages for missing or invalid encryption metadata - Added glog.V(2) debugging for decryption setup This fixes SSE integration test failures where encrypted objects could not be retrieved due to volume server authentication failures. The JWT bug was causing volume servers to reject requests, resulting in truncated/empty streams (EOF) or internal errors. * s3: Fix SSE multipart upload metadata preservation Critical fix for SSE multipart upload test failures (SSE-C and SSE-KMS): **Root Cause - Incomplete SSE Metadata Copying**: The old code only tried to copy 'SeaweedFSSSEKMSKey' from the first part to the completed object. This had TWO bugs: 1. **Wrong Constant Name** (Key Mismatch Bug): - Storage uses: SeaweedFSSSEKMSKeyHeader = 'X-SeaweedFS-SSE-KMS-Key' - Old code read: SeaweedFSSSEKMSKey = 'x-seaweedfs-sse-kms-key' - Result: SSE-KMS metadata was NEVER copied → 500 errors 2. **Missing SSE-C and SSE-S3 Headers**: - SSE-C requires: IV, Algorithm, KeyMD5 - SSE-S3 requires: encrypted key data + standard headers - Old code: copied nothing for SSE-C/SSE-S3 → decryption failures **Fix - Complete SSE Header Preservation**: Now copies ALL SSE headers from first part to completed object: - SSE-C: SeaweedFSSSEIV, CustomerAlgorithm, CustomerKeyMD5 - SSE-KMS: SeaweedFSSSEKMSKeyHeader, AwsKmsKeyId, ServerSideEncryption - SSE-S3: SeaweedFSSSES3Key, ServerSideEncryption Applied consistently to all 3 code paths: 1. Versioned buckets (creates version file) 2. Suspended versioning (creates main object with null versionId) 3. Non-versioned buckets (creates main object) **Why This Is Correct**: The headers copied EXACTLY match what putToFiler stores during part upload (lines 496-521 in s3api_object_handlers_put.go). This ensures detectPrimarySSEType() can correctly identify encrypted multipart objects and trigger inline decryption with proper metadata. Fixes: TestSSEMultipartUploadIntegration (SSE-C and SSE-KMS subtests) * s3: Add debug logging for versioning state diagnosis Temporary debug logging to diagnose test_versioning_obj_plain_null_version_overwrite_suspended failure. Added glog.V(0) logging to show: 1. setBucketVersioningStatus: when versioning status is changed 2. PutObjectHandler: what versioning state is detected (Enabled/Suspended/none) 3. PutObjectHandler: which code path is taken (putVersionedObject vs putSuspendedVersioningObject) This will help identify if: - The versioning status is being set correctly in bucket config - The cache is returning stale/incorrect versioning state - The switch statement is correctly routing to suspended vs enabled handlers * s3: Enhanced versioning state tracing for suspended versioning diagnosis Added comprehensive logging across the entire versioning state flow: PutBucketVersioningHandler: - Log requested status (Enabled/Suspended) - Log when calling setBucketVersioningStatus - Log success/failure of status change setBucketVersioningStatus: - Log bucket and status being set - Log when config is updated - Log completion with error code updateBucketConfig: - Log versioning state being written to cache - Immediate cache verification after Set - Log if cache verification fails getVersioningState: - Log bucket name and state being returned - Log if object lock forces VersioningEnabled - Log errors This will reveal: 1. If PutBucketVersioning(Suspended) is reaching the handler 2. If the cache update succeeds 3. What state getVersioningState returns during PUT 4. Any cache consistency issues Expected to show why bucket still reports 'Enabled' after 'Suspended' call. * s3: Add SSE chunk detection debugging for multipart uploads Added comprehensive logging to diagnose why TestSSEMultipartUploadIntegration fails: detectPrimarySSEType now logs: 1. Total chunk count and extended header count 2. All extended headers with 'sse'/'SSE'/'encryption' in the name 3. For each chunk: index, SseType, and whether it has metadata 4. Final SSE type counts (SSE-C, SSE-KMS, SSE-S3) This will reveal if: - Chunks are missing SSE metadata after multipart completion - Extended headers are copied correctly from first part - The SSE detection logic is working correctly Expected to show if chunks have SseType=0 (none) or proper SSE types set. * s3: Trace SSE chunk metadata through multipart completion and retrieval Added end-to-end logging to track SSE chunk metadata lifecycle: **During Multipart Completion (filer_multipart.go)**: 1. Log finalParts chunks BEFORE mkFile - shows SseType and metadata 2. Log versionEntry.Chunks INSIDE mkFile callback - shows if mkFile preserves SSE info 3. Log success after mkFile completes **During GET Retrieval (s3api_object_handlers.go)**: 1. Log retrieved entry chunks - shows SseType and metadata after retrieval 2. Log detected SSE type result This will reveal at which point SSE chunk metadata is lost: - If finalParts have SSE metadata but versionEntry.Chunks don't → mkFile bug - If versionEntry.Chunks have SSE metadata but retrieved chunks don't → storage/retrieval bug - If chunks never have SSE metadata → multipart completion SSE processing bug Expected to show chunks with SseType=NONE during retrieval even though they were created with proper SseType during multipart completion. * s3: Fix SSE-C multipart IV base64 decoding bug **Critical Bug Found**: SSE-C multipart uploads were failing because: Root Cause: - entry.Extended[SeaweedFSSSEIV] stores base64-encoded IV (24 bytes for 16-byte IV) - SerializeSSECMetadata expects raw IV bytes (16 bytes) - During multipart completion, we were passing base64 IV directly → serialization error Error Message: "Failed to serialize SSE-C metadata for chunk in part X: invalid IV length: expected 16 bytes, got 24" Fix: - Base64-decode IV before passing to SerializeSSECMetadata - Added error handling for decode failures Impact: - SSE-C multipart uploads will now correctly serialize chunk metadata - Chunks will have proper SSE metadata for decryption during GET This fixes the SSE-C subtest of TestSSEMultipartUploadIntegration. SSE-KMS still has a separate issue (error code 23) being investigated. * fixes * kms sse * handle retry if not found in .versions folder and should read the normal object * quick check (no retries) to see if the .versions/ directory exists * skip retry if object is not found * explicit update to avoid sync delay * fix map update lock * Remove fmt.Printf debug statements * Fix SSE-KMS multipart base IV fallback to fail instead of regenerating * fmt * Fix ACL grants storage logic * header handling * nil handling * range read for sse content * test range requests for sse objects * fmt * unused code * upload in chunks * header case * fix url * bucket policy error vs bucket not found * jwt handling * fmt * jwt in request header * Optimize Case-Insensitive Prefix Check * dead code * Eliminated Unnecessary Stream Prefetch for Multipart SSE * range sse * sse * refactor * context * fmt * fix type * fix SSE-C IV Mismatch * Fix Headers Being Set After WriteHeader * fix url parsing * propergate sse headers * multipart sse-s3 * aws sig v4 authen * sse kms * set content range * better errors * Update s3api_object_handlers_copy.go * Update s3api_object_handlers.go * Update s3api_object_handlers.go * avoid magic number * clean up * Update s3api_bucket_policy_handlers.go * fix url parsing * context * data and metadata both use background context * adjust the offset * SSE Range Request IV Calculation * adjust logs * IV relative to offset in each part, not the whole file * collect logs * offset * fix offset * fix url * logs * variable * jwt * Multipart ETag semantics: conditionally set object-level Md5 for single-chunk uploads only. * sse * adjust IV and offset * multipart boundaries * ensures PUT and GET operations return consistent ETags * Metadata Header Case * CommonPrefixes Sorting with URL Encoding * always sort * remove the extra PathUnescape call * fix the multipart get part ETag * the FileChunk is created without setting ModifiedTsNs * Sort CommonPrefixes lexicographically to match AWS S3 behavior * set md5 for multipart uploads * prevents any potential data loss or corruption in the small-file inline storage path * compiles correctly * decryptedReader will now be properly closed after use * Fixed URL encoding and sort order for CommonPrefixes * Update s3api_object_handlers_list.go * SSE-x Chunk View Decryption * Different IV offset calculations for single-part vs multipart objects * still too verbose in logs * less logs * ensure correct conversion * fix listing * nil check * minor fixes * nil check * single character delimiter * optimize * range on empty object or zero-length * correct IV based on its position within that part, not its position in the entire object * adjust offset * offset Fetch FULL encrypted chunk (not just the range) Adjust IV by PartOffset/ChunkOffset only Decrypt full chunk Skip in the DECRYPTED stream to reach OffsetInChunk * look breaking * refactor * error on no content * handle intra-block byte skipping * Incomplete HTTP Response Error Handling * multipart SSE * Update s3api_object_handlers.go * address comments * less logs * handling directory * Optimized rejectDirectoryObjectWithoutSlash() to avoid unnecessary lookups * Revert "handling directory" This reverts commit 3a335f0ac33c63f51975abc63c40e5328857a74b. * constant * Consolidate nil entry checks in GetObjectHandler * add range tests * Consolidate redundant nil entry checks in HeadObjectHandler * adjust logs * SSE type * large files * large files Reverted the plain-object range test * ErrNoEncryptionConfig * Fixed SSERangeReader Infinite Loop Vulnerability * Fixed SSE-KMS Multipart ChunkReader HTTP Body Leak * handle empty directory in S3, added PyArrow tests * purge unused code * Update s3_parquet_test.py * Update requirements.txt * According to S3 specifications, when both partNumber and Range are present, the Range should apply within the selected part's boundaries, not to the full object. * handle errors * errors after writing header * https * fix: Wait for volume assignment readiness before running Parquet tests The test-implicit-dir-with-server test was failing with an Internal Error because volume assignment was not ready when tests started. This fix adds a check that attempts a volume assignment and waits for it to succeed before proceeding with tests. This ensures that: 1. Volume servers are registered with the master 2. Volume growth is triggered if needed 3. The system can successfully assign volumes for writes Fixes the timeout issue where boto3 would retry 4 times and fail with 'We encountered an internal error, please try again.' * sse tests * store derived IV * fix: Clean up gRPC ports between tests to prevent port conflicts The second test (test-implicit-dir-with-server) was failing because the volume server's gRPC port (18080 = VOLUME_PORT + 10000) was still in use from the first test. The cleanup code only killed HTTP port processes, not gRPC port processes. Added cleanup for gRPC ports in all stop targets: - Master gRPC: MASTER_PORT + 10000 (19333) - Volume gRPC: VOLUME_PORT + 10000 (18080) - Filer gRPC: FILER_PORT + 10000 (18888) This ensures clean state between test runs in CI. * add import * address comments * docs: Add placeholder documentation files for Parquet test suite Added three missing documentation files referenced in test/s3/parquet/README.md: 1. TEST_COVERAGE.md - Documents 43 total test cases (17 Go unit tests, 6 Python integration tests, 20 Python end-to-end tests) 2. FINAL_ROOT_CAUSE_ANALYSIS.md - Explains the s3fs compatibility issue with PyArrow, the implicit directory problem, and how the fix works 3. MINIO_DIRECTORY_HANDLING.md - Compares MinIO's directory handling approach with SeaweedFS's implementation Each file contains: - Title and overview - Key technical details relevant to the topic - TODO sections for future expansion These placeholder files resolve the broken README links and provide structure for future detailed documentation. * clean up if metadata operation failed * Update s3_parquet_test.py * clean up * Update Makefile * Update s3_parquet_test.py * Update Makefile * Handle ivSkip for non-block-aligned offsets * Update README.md * stop volume server faster * stop volume server in 1 second * different IV for each chunk in SSE-S3 and SSE-KMS * clean up if fails * testing upload * error propagation * fmt * simplify * fix copying * less logs * endian * Added marshaling error handling * handling invalid ranges * error handling for adding to log buffer * fix logging * avoid returning too quickly and ensure proper cleaning up * Activity Tracking for Disk Reads * Cleanup Unused Parameters * Activity Tracking for Kafka Publishers * Proper Test Error Reporting * refactoring * less logs * less logs * go fmt * guard it with if entry.Attributes.TtlSec > 0 to match the pattern used elsewhere. * Handle bucket-default encryption config errors explicitly for multipart * consistent activity tracking * obsolete code for s3 on filer read/write handlers * Update weed/s3api/s3api_object_handlers_list.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go694
1 files changed, 507 insertions, 187 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 6ce48429f..f7105052e 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -1,25 +1,28 @@
package s3api
import (
- "crypto/md5"
+ "context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
+ "net/url"
+ "path/filepath"
"strconv"
"strings"
"time"
"github.com/pquerna/cachecontrol/cacheobject"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
- weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
@@ -60,6 +63,13 @@ type BucketDefaultEncryptionResult struct {
SSEKMSKey *SSEKMSKey
}
+// SSEResponseMetadata holds encryption metadata needed for HTTP response headers
+type SSEResponseMetadata struct {
+ SSEType string
+ KMSKeyID string
+ BucketKeyEnabled bool
+}
+
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
@@ -135,7 +145,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningConfigured := (versioningState != "")
- glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
+ glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@@ -158,29 +168,34 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
switch versioningState {
case s3_constants.VersioningEnabled:
// Handle enabled versioning - create new versions with real version IDs
- glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
- versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
+ glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
+ versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode)
return
}
- glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
+ glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
// Set version ID in response header
if versionId != "" {
w.Header().Set("x-amz-version-id", versionId)
- glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
+ glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
} else {
glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object)
}
// Set ETag in response
setEtag(w, etag)
+
+ // Set SSE response headers for versioned objects
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
+
case s3_constants.VersioningSuspended:
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
- etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
+ glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object)
+ etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@@ -191,6 +206,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response
setEtag(w, etag)
+
+ // Set SSE response headers for suspended versioning
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
default:
// Handle regular PUT (never configured versioning)
uploadUrl := s3a.toFilerUrl(bucket, object)
@@ -198,7 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader)
}
- etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
+ etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@@ -209,9 +227,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers based on encryption type used
- if sseType == s3_constants.SSETypeS3 {
- w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
- }
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
}
}
stats_collect.RecordBucketActiveTime(bucket)
@@ -220,15 +236,18 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r)
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
- // Calculate unique offset for each part to prevent IV reuse in multipart uploads
- // This is critical for CTR mode encryption security
- partOffset := calculatePartOffset(partNumber)
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
+ // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
+ // This eliminates the filer proxy overhead for PUT operations
+
+ // For SSE, encrypt with offset=0 for all parts
+ // Each part is encrypted independently, then decrypted using metadata during GET
+ partOffset := int64(0)
- // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
+ // Handle all SSE encryption types in a unified manner
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
if sseErrorCode != s3err.ErrNone {
- return "", sseErrorCode, ""
+ return "", sseErrorCode, SSEResponseMetadata{}
}
// Extract results from unified SSE handling
@@ -239,6 +258,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseKMSMetadata := sseResult.SSEKMSMetadata
sseS3Key := sseResult.SSES3Key
sseS3Metadata := sseResult.SSES3Metadata
+ sseType := sseResult.SSEType
// Apply bucket default encryption if no explicit encryption was provided
// This implements AWS S3 behavior where bucket default encryption automatically applies
@@ -249,7 +269,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
if applyErr != nil {
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
- return "", s3err.ErrInternalError, ""
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update variables based on the result
@@ -257,121 +277,357 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseS3Key = encryptionResult.SSES3Key
sseKMSKey = encryptionResult.SSEKMSKey
+ // If bucket-default encryption selected an algorithm, reflect it in SSE type
+ if sseType == "" {
+ if sseS3Key != nil {
+ sseType = s3_constants.SSETypeS3
+ } else if sseKMSKey != nil {
+ sseType = s3_constants.SSETypeKMS
+ }
+ }
+
// If SSE-S3 was applied by bucket default, prepare metadata (if not already done)
if sseS3Key != nil && len(sseS3Metadata) == 0 {
var metaErr error
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
if metaErr != nil {
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
- return "", s3err.ErrInternalError, ""
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
}
} else {
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
}
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ // Parse the upload URL to extract the file path
+ // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.)
+ // Use proper URL parsing instead of string manipulation for robustness
+ parsedUrl, parseErr := url.Parse(uploadUrl)
+ if parseErr != nil {
+ glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr)
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
+ }
+
+ // Use parsedUrl.Path directly - it's already decoded by url.Parse()
+ // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/"
+ // Calling PathUnescape again would double-decode and fail on keys like "b%ar"
+ filePath := parsedUrl.Path
- proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
+ // Step 1 & 2: Use auto-chunking to handle large files without OOM
+ // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
+ const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
+ const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
+ collection := ""
+ if s3a.option.FilerGroup != "" {
+ collection = s3a.getCollectionName(bucket)
+ }
+
+ // Create assign function for chunked upload
+ assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
+ var assignResult *filer_pb.AssignVolumeResponse
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{
+ Count: int32(count),
+ Replication: "",
+ Collection: collection,
+ DiskType: "",
+ DataCenter: s3a.option.DataCenter,
+ Path: filePath,
+ })
+ if err != nil {
+ return fmt.Errorf("assign volume: %w", err)
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume: %v", resp.Error)
+ }
+ assignResult = resp
+ return nil
+ })
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Convert filer_pb.AssignVolumeResponse to operation.AssignResult
+ return nil, &operation.AssignResult{
+ Fid: assignResult.FileId,
+ Url: assignResult.Location.Url,
+ PublicUrl: assignResult.Location.PublicUrl,
+ Count: uint64(count),
+ Auth: security.EncodedJwt(assignResult.Auth),
+ }, nil
+ }
+
+ // Upload with auto-chunking
+ // Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled
+ // This prevents partial uploads and data corruption
+ chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{
+ ChunkSize: chunkSize,
+ SmallFileLimit: smallFileLimit,
+ Collection: collection,
+ DataCenter: s3a.option.DataCenter,
+ SaveSmallInline: false, // S3 API always creates chunks, never stores inline
+ MimeType: r.Header.Get("Content-Type"),
+ AssignFunc: assignFunc,
+ })
if err != nil {
- glog.Errorf("NewRequest %s: %v", uploadUrl, err)
- return "", s3err.ErrInternalError, ""
- }
+ glog.Errorf("putToFiler: chunked upload failed: %v", err)
+
+ // CRITICAL: Cleanup orphaned chunks before returning error
+ // UploadReaderInChunks now returns partial results even on error,
+ // allowing us to cleanup any chunks that were successfully uploaded
+ // before the failure occurred
+ if chunkResult != nil && len(chunkResult.FileChunks) > 0 {
+ glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
+ s3a.deleteOrphanedChunks(chunkResult.FileChunks)
+ }
- proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- if destination != "" {
- proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
+ if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
+ return "", s3err.ErrInvalidDigest, SSEResponseMetadata{}
+ }
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
- if s3a.option.FilerGroup != "" {
- query := proxyReq.URL.Query()
- query.Add("collection", s3a.getCollectionName(bucket))
- proxyReq.URL.RawQuery = query.Encode()
- }
+ // Step 3: Calculate MD5 hash and add SSE metadata to chunks
+ md5Sum := chunkResult.Md5Hash.Sum(nil)
- for header, values := range r.Header {
- for _, value := range values {
- proxyReq.Header.Add(header, value)
+ glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d",
+ filePath, len(chunkResult.FileChunks), chunkResult.TotalSize)
+
+ // Log chunk details for debugging (verbose only - high frequency)
+ if glog.V(4) {
+ for i, chunk := range chunkResult.FileChunks {
+ glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size)
}
}
- // Log version ID header for debugging
- if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
- glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl)
+ // Add SSE metadata to all chunks if present
+ for _, chunk := range chunkResult.FileChunks {
+ switch {
+ case customerKey != nil:
+ // SSE-C: Create per-chunk metadata (matches filer logic)
+ chunk.SseType = filer_pb.SSEType_SSE_C
+ if len(sseIV) > 0 {
+ // PartOffset tracks position within the encrypted stream
+ // Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0,
+ // PartOffset = chunk.Offset represents where this chunk is in that encrypted stream
+ // - Single-part: chunk.Offset is position in the file's encrypted stream
+ // - Multipart: chunk.Offset is position in this part's encrypted stream
+ ssecMetadataStruct := struct {
+ Algorithm string `json:"algorithm"`
+ IV string `json:"iv"`
+ KeyMD5 string `json:"keyMD5"`
+ PartOffset int64 `json:"partOffset"`
+ }{
+ Algorithm: "AES256",
+ IV: base64.StdEncoding.EncodeToString(sseIV),
+ KeyMD5: customerKey.KeyMD5,
+ PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0)
+ }
+ if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil {
+ chunk.SseMetadata = ssecMetadata
+ }
+ }
+ case sseKMSKey != nil:
+ // SSE-KMS: Create per-chunk metadata with chunk-specific offsets
+ // Each chunk needs its own metadata with ChunkOffset set for proper IV calculation during decryption
+ chunk.SseType = filer_pb.SSEType_SSE_KMS
+
+ // Create a copy of the SSE-KMS key with chunk-specific offset
+ chunkSSEKey := &SSEKMSKey{
+ KeyID: sseKMSKey.KeyID,
+ EncryptedDataKey: sseKMSKey.EncryptedDataKey,
+ EncryptionContext: sseKMSKey.EncryptionContext,
+ BucketKeyEnabled: sseKMSKey.BucketKeyEnabled,
+ IV: sseKMSKey.IV,
+ ChunkOffset: chunk.Offset, // Set chunk-specific offset for IV calculation
+ }
+
+ // Serialize per-chunk metadata
+ if chunkMetadata, serErr := SerializeSSEKMSMetadata(chunkSSEKey); serErr == nil {
+ chunk.SseMetadata = chunkMetadata
+ } else {
+ glog.Errorf("Failed to serialize SSE-KMS metadata for chunk at offset %d: %v", chunk.Offset, serErr)
+ }
+ case sseS3Key != nil:
+ // SSE-S3: Create per-chunk metadata with chunk-specific IVs
+ // Each chunk needs its own IV calculated from the base IV + chunk offset
+ chunk.SseType = filer_pb.SSEType_SSE_S3
+
+ // Calculate chunk-specific IV using base IV and chunk offset
+ chunkIV, _ := calculateIVWithOffset(sseS3Key.IV, chunk.Offset)
+
+ // Create a copy of the SSE-S3 key with chunk-specific IV
+ chunkSSEKey := &SSES3Key{
+ Key: sseS3Key.Key,
+ KeyID: sseS3Key.KeyID,
+ Algorithm: sseS3Key.Algorithm,
+ IV: chunkIV, // Use chunk-specific IV
+ }
+
+ // Serialize per-chunk metadata
+ if chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey); serErr == nil {
+ chunk.SseMetadata = chunkMetadata
+ } else {
+ glog.Errorf("Failed to serialize SSE-S3 metadata for chunk at offset %d: %v", chunk.Offset, serErr)
+ }
+ }
}
- // Set object owner header for filer to extract
+ // Step 4: Create metadata entry
+ now := time.Now()
+ mimeType := r.Header.Get("Content-Type")
+ if mimeType == "" {
+ mimeType = "application/octet-stream"
+ }
+
+ // Create entry
+ entry := &filer_pb.Entry{
+ Name: filepath.Base(filePath),
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: now.Unix(),
+ Mtime: now.Unix(),
+ FileMode: 0660,
+ Uid: 0,
+ Gid: 0,
+ Mime: mimeType,
+ FileSize: uint64(chunkResult.TotalSize),
+ },
+ Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
+ Extended: make(map[string][]byte),
+ }
+
+ // Set Md5 attribute based on context:
+ // 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5
+ // - Parts must use simple MD5 ETags, never composite format
+ // - Even if a part has multiple chunks internally, its ETag is MD5 of entire part
+ // 2. For regular object uploads: only set Md5 for single-chunk uploads
+ // - Multi-chunk regular objects use composite "md5-count" format
+ isMultipartPart := strings.Contains(filePath, "/"+s3_constants.MultipartUploadsFolder+"/")
+ if isMultipartPart || len(chunkResult.FileChunks) == 1 {
+ entry.Attributes.Md5 = md5Sum
+ }
+
+ // Calculate ETag using the same logic as GET to ensure consistency
+ // For single chunk: uses entry.Attributes.Md5
+ // For multiple chunks: uses filer.ETagChunks() which returns "<hash>-<count>"
+ etag = filer.ETag(entry)
+ glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks))
+
+ // Set object owner
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
- proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
- glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
+ entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
+ glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath)
+ }
+
+ // Set version ID if present
+ if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
+ entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader)
+ glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath)
+ }
+
+ // Set TTL-based S3 expiry flag only if object has a TTL
+ if entry.Attributes.TtlSec > 0 {
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ }
+
+ // Copy user metadata and standard headers
+ for k, v := range r.Header {
+ if len(v) > 0 && len(v[0]) > 0 {
+ if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
+ // Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo)
+ // We store them as they come in (after canonicalization) to preserve the user's intent
+ entry.Extended[k] = []byte(v[0])
+ } else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
+ entry.Extended[k] = []byte(v[0])
+ }
+ if k == "Response-Content-Disposition" {
+ entry.Extended["Content-Disposition"] = []byte(v[0])
+ }
+ }
}
- // Set SSE-C metadata headers for the filer if encryption was applied
+ // Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 {
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
- // Store IV in a custom header that the filer can use to store in entry metadata
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
+ // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV
+ entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
+ entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5)
+ glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV))
}
- // Set SSE-KMS metadata headers for the filer if KMS encryption was applied
+ // Set SSE-KMS metadata
if sseKMSKey != nil {
- // Use already-serialized SSE-KMS metadata from helper function
- // Store serialized KMS metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
-
- glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
- } else {
- glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
+ // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata
+ // Set standard SSE headers for detection
+ entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms")
+ entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID)
+ glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata))
}
- // Set SSE-S3 metadata headers for the filer if S3 encryption was applied
+ // Set SSE-S3 metadata
if sseS3Key != nil && len(sseS3Metadata) > 0 {
- // Store serialized S3 metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
- glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
- }
- // Set TTL-based S3 expiry (modification time)
- proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true")
- // ensure that the Authorization header is overriding any previous
- // Authorization header which might be already present in proxyReq
- s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
- resp, postErr := s3a.client.Do(proxyReq)
-
- if postErr != nil {
- glog.Errorf("post to filer: %v", postErr)
- if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
- return "", s3err.ErrInvalidDigest, ""
+ // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata
+ // Set standard SSE header for detection
+ entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256")
+ glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata))
+ }
+
+ // Step 4: Save metadata to filer via gRPC
+ // Use context.Background() to ensure metadata save completes even if HTTP request is cancelled
+ // This matches the chunk upload behavior and prevents orphaned chunks
+ glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d",
+ filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended))
+ createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ req := &filer_pb.CreateEntryRequest{
+ Directory: filepath.Dir(filePath),
+ Entry: entry,
+ }
+ glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath)
+ _, err := client.CreateEntry(context.Background(), req)
+ if err != nil {
+ glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
}
- return "", s3err.ErrInternalError, ""
+ return err
+ })
+ if createErr != nil {
+ glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
+
+ // CRITICAL: Cleanup orphaned chunks before returning error
+ // If CreateEntry fails, the uploaded chunks are orphaned and must be deleted
+ // to prevent resource leaks and wasted storage
+ if len(chunkResult.FileChunks) > 0 {
+ glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
+ s3a.deleteOrphanedChunks(chunkResult.FileChunks)
+ }
+
+ return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{}
}
- defer resp.Body.Close()
+ glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
- etag = fmt.Sprintf("%x", hash.Sum(nil))
+ glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d",
+ filePath, etag, entry.Attributes.FileSize, partNumber)
- resp_body, ra_err := io.ReadAll(resp.Body)
- if ra_err != nil {
- glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
- return etag, s3err.ErrInternalError, ""
- }
- var ret weed_server.FilerPostResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
- return "", s3err.ErrInternalError, ""
- }
- if ret.Error != "" {
- glog.Errorf("upload to filer error: %v", ret.Error)
- return "", filerErrorToS3Error(ret.Error), ""
+ BucketTrafficReceived(chunkResult.TotalSize, r)
+
+ // Build SSE response metadata with encryption details
+ responseMetadata := SSEResponseMetadata{
+ SSEType: sseType,
}
- BucketTrafficReceived(ret.Size, r)
+ // For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata
+ if sseKMSKey != nil {
+ responseMetadata.KMSKeyID = sseKMSKey.KeyID
+ responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled
+ glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v",
+ sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled)
+ }
- // Return the SSE type determined by the unified handler
- return etag, s3err.ErrNone, sseResult.SSEType
+ return etag, s3err.ErrNone, responseMetadata
}
func setEtag(w http.ResponseWriter, etag string) {
@@ -384,6 +640,43 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
+// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
+func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) {
+ switch sseMetadata.SSEType {
+ case s3_constants.SSETypeS3:
+ // SSE-S3: Return the encryption algorithm
+ w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
+
+ case s3_constants.SSETypeC:
+ // SSE-C: Echo back the customer-provided algorithm and key MD5
+ if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo)
+ }
+ if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5)
+ }
+
+ case s3_constants.SSETypeKMS:
+ // SSE-KMS: Return the KMS key ID and algorithm
+ w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
+
+ // Use metadata from stored encryption config (for bucket-default encryption)
+ // or fall back to request headers (for explicit encryption)
+ if sseMetadata.KMSKeyID != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID)
+ } else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
+ }
+
+ // Set bucket-key-enabled header if it was enabled
+ if sseMetadata.BucketKeyEnabled {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
+ } else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
+ }
+ }
+}
+
func filerErrorToS3Error(errString string) s3err.ErrorCode {
switch {
case errString == constants.ErrMsgBadDigest:
@@ -400,26 +693,6 @@ func filerErrorToS3Error(errString string) s3err.ErrorCode {
}
}
-func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
- encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
-
- if encodedJwt == "" {
- return
- }
-
- r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
-}
-
-func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
- var encodedJwt security.EncodedJwt
- if isWrite {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
- } else {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
- }
- return string(encodedJwt)
-}
-
// setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@@ -446,19 +719,12 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
//
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
-func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
+func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object)
- // Enable detailed logging for testobjbar
- isTestObj := (normalizedObject == "testobjbar")
-
- glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
- bucket, object, normalizedObject, isTestObj)
-
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
- }
+ glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s",
+ bucket, object, normalizedObject)
bucketDir := s3a.option.BucketsPath + "/" + bucket
@@ -470,20 +736,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
if err == nil {
// .versions directory exists
- glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
+ glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
for _, entry := range entries {
if entry.Extended != nil {
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
versionId := string(versionIdBytes)
- glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
+ glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
if versionId == "null" {
// Only delete null version - preserve real versioned entries
- glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions")
+ glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions")
err := s3a.rm(versionsDir, entry.Name, true, false)
if err != nil {
glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err)
} else {
- glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version")
+ glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version")
}
break
}
@@ -491,13 +757,12 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
}
} else {
- glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
+ glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
}
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ body := dataReader
if objectContentType == "" {
body = mimeDetect(r, body)
}
@@ -508,10 +773,6 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Set version ID to "null" for suspended versioning
r.Header.Set(s3_constants.ExtVersionIdKey, "null")
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
- s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey))
- }
// Extract and set object lock metadata as headers
// This handles retention mode, retention date, and legal hold
@@ -528,7 +789,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
- return "", s3err.ErrInvalidRequest
+ return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
@@ -540,7 +801,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
} else {
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
- return "", s3err.ErrInvalidRequest
+ return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
}
@@ -562,43 +823,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
// Upload the file using putToFiler - this will create the file with version metadata
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===")
- }
- etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
- return "", errCode
- }
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
- }
-
- // Verify the metadata was set correctly during file creation
- if isTestObj {
- // Read back the entry to verify
- maxRetries := 3
- for attempt := 1; attempt <= maxRetries; attempt++ {
- verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject)
- if verifyErr == nil {
- glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
- if verifyEntry.Extended != nil {
- if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok {
- glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
- }
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
- }
- break
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
- }
- if attempt < maxRetries {
- time.Sleep(time.Millisecond * 10)
- }
- }
+ return "", errCode, SSEResponseMetadata{}
}
// Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
@@ -609,10 +837,8 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
- }
- return etag, s3err.ErrNone
+
+ return etag, s3err.ErrNone, sseMetadata
}
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
@@ -684,7 +910,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return nil
}
-func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
+func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Generate version ID
versionId = generateVersionId()
@@ -709,21 +935,20 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ body := dataReader
if objectContentType == "" {
body = mimeDetect(r, body)
}
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
- etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
- return "", "", errCode
+ return "", "", errCode, SSEResponseMetadata{}
}
// Get the uploaded entry to add versioning metadata
@@ -745,7 +970,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Add versioning metadata to this version
@@ -766,7 +991,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
- return "", "", s3err.ErrInvalidRequest
+ return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
// Update the version entry with metadata
@@ -777,17 +1002,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
- return versionId, etag, s3err.ErrNone
+ return versionId, etag, s3err.ErrNone, sseMetadata
}
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
@@ -897,7 +1122,16 @@ func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, en
func (s3a *S3ApiServer) applyBucketDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
// Check if bucket has default encryption configured
encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
- if err != nil || encryptionConfig == nil {
+ if err != nil {
+ // Check if this is just "no encryption configured" vs a real error
+ if errors.Is(err, ErrNoEncryptionConfig) {
+ // No default encryption configured, return original reader
+ return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
+ }
+ // Real error - propagate to prevent silent encryption bypass
+ return nil, fmt.Errorf("failed to read bucket encryption config: %v", err)
+ }
+ if encryptionConfig == nil {
// No default encryption configured, return original reader
return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
}
@@ -963,7 +1197,8 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ
bucketKeyEnabled := encryptionConfig.BucketKeyEnabled
// Build encryption context for KMS
- bucket, object := s3_constants.GetBucketAndObject(r)
+ // Use bucket parameter passed to function (not from request parsing)
+ _, object := s3_constants.GetBucketAndObject(r)
encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled)
// Create SSE-KMS encrypted reader
@@ -1474,3 +1709,88 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe
func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult {
return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object)
}
+
+// deleteOrphanedChunks attempts to delete chunks that were uploaded but whose entry creation failed
+// This prevents resource leaks and wasted storage. Errors are logged but don't prevent cleanup attempts.
+func (s3a *S3ApiServer) deleteOrphanedChunks(chunks []*filer_pb.FileChunk) {
+ if len(chunks) == 0 {
+ return
+ }
+
+ // Extract file IDs from chunks
+ var fileIds []string
+ for _, chunk := range chunks {
+ if chunk.GetFileIdString() != "" {
+ fileIds = append(fileIds, chunk.GetFileIdString())
+ }
+ }
+
+ if len(fileIds) == 0 {
+ glog.Warningf("deleteOrphanedChunks: no valid file IDs found in %d chunks", len(chunks))
+ return
+ }
+
+ glog.V(3).Infof("deleteOrphanedChunks: attempting to delete %d file IDs: %v", len(fileIds), fileIds)
+
+ // Create a lookup function that queries the filer for volume locations
+ // This is similar to createLookupFileIdFunction but returns the format needed by DeleteFileIdsWithLookupVolumeId
+ lookupFunc := func(vids []string) (map[string]*operation.LookupResult, error) {
+ results := make(map[string]*operation.LookupResult)
+
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Query filer for all volume IDs at once
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return err
+ }
+
+ // Convert filer response to operation.LookupResult format
+ for vid, locs := range resp.LocationsMap {
+ result := &operation.LookupResult{
+ VolumeOrFileId: vid,
+ }
+
+ for _, loc := range locs.Locations {
+ result.Locations = append(result.Locations, operation.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: int(loc.GrpcPort),
+ })
+ }
+
+ results[vid] = result
+ }
+ return nil
+ })
+
+ return results, err
+ }
+
+ // Attempt deletion using the operation package's batch delete with custom lookup
+ deleteResults := operation.DeleteFileIdsWithLookupVolumeId(s3a.option.GrpcDialOption, fileIds, lookupFunc)
+
+ // Log results - track successes and failures
+ successCount := 0
+ failureCount := 0
+ for _, result := range deleteResults {
+ if result.Error != "" {
+ glog.Warningf("deleteOrphanedChunks: failed to delete chunk %s: %s (status: %d)",
+ result.FileId, result.Error, result.Status)
+ failureCount++
+ } else {
+ glog.V(4).Infof("deleteOrphanedChunks: successfully deleted chunk %s (size: %d bytes)",
+ result.FileId, result.Size)
+ successCount++
+ }
+ }
+
+ if failureCount > 0 {
+ glog.Warningf("deleteOrphanedChunks: cleanup completed with %d successes and %d failures out of %d chunks",
+ successCount, failureCount, len(fileIds))
+ } else {
+ glog.V(3).Infof("deleteOrphanedChunks: successfully deleted all %d orphaned chunks", successCount)
+ }
+}