diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 694 |
1 files changed, 507 insertions, 187 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 6ce48429f..f7105052e 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1,25 +1,28 @@ package s3api import ( - "crypto/md5" + "context" "encoding/base64" "encoding/json" "errors" "fmt" "io" "net/http" + "net/url" + "path/filepath" "strconv" "strings" "time" "github.com/pquerna/cachecontrol/cacheobject" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" - weed_server "github.com/seaweedfs/seaweedfs/weed/server" stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/util/constants" ) @@ -60,6 +63,13 @@ type BucketDefaultEncryptionResult struct { SSEKMSKey *SSEKMSKey } +// SSEResponseMetadata holds encryption metadata needed for HTTP response headers +type SSEResponseMetadata struct { + SSEType string + KMSKeyID string + BucketKeyEnabled bool +} + func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html @@ -135,7 +145,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) versioningEnabled := (versioningState == s3_constants.VersioningEnabled) versioningConfigured := (versioningState != "") - glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) + glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured) // Validate object lock headers before processing if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil { @@ -158,29 +168,34 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) switch versioningState { case s3_constants.VersioningEnabled: // Handle enabled versioning - create new versions with real version IDs - glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) - versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) + glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object) + versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object) s3err.WriteErrorResponse(w, r, errCode) return } - glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) + glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object) // Set version ID in response header if versionId != "" { w.Header().Set("x-amz-version-id", versionId) - glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) + glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object) } else { glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object) } // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for versioned objects + s3a.setSSEResponseHeaders(w, r, sseMetadata) + case s3_constants.VersioningSuspended: // Handle suspended versioning - overwrite with "null" version ID but preserve existing versions - etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) + glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object) + etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return @@ -191,6 +206,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) // Set ETag in response setEtag(w, etag) + + // Set SSE response headers for suspended versioning + s3a.setSSEResponseHeaders(w, r, sseMetadata) default: // Handle regular PUT (never configured versioning) uploadUrl := s3a.toFilerUrl(bucket, object) @@ -198,7 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -209,9 +227,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) setEtag(w, etag) // Set SSE response headers based on encryption type used - if sseType == s3_constants.SSETypeS3 { - w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) - } + s3a.setSSEResponseHeaders(w, r, sseMetadata) } } stats_collect.RecordBucketActiveTime(bucket) @@ -220,15 +236,18 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w, r) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) { - // Calculate unique offset for each part to prevent IV reuse in multipart uploads - // This is critical for CTR mode encryption security - partOffset := calculatePartOffset(partNumber) +func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { + // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy + // This eliminates the filer proxy overhead for PUT operations + + // For SSE, encrypt with offset=0 for all parts + // Each part is encrypted independently, then decrypted using metadata during GET + partOffset := int64(0) - // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments + // Handle all SSE encryption types in a unified manner sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset) if sseErrorCode != s3err.ErrNone { - return "", sseErrorCode, "" + return "", sseErrorCode, SSEResponseMetadata{} } // Extract results from unified SSE handling @@ -239,6 +258,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader sseKMSMetadata := sseResult.SSEKMSMetadata sseS3Key := sseResult.SSES3Key sseS3Metadata := sseResult.SSES3Metadata + sseType := sseResult.SSEType // Apply bucket default encryption if no explicit encryption was provided // This implements AWS S3 behavior where bucket default encryption automatically applies @@ -249,7 +269,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader) if applyErr != nil { glog.Errorf("Failed to apply bucket default encryption: %v", applyErr) - return "", s3err.ErrInternalError, "" + return "", s3err.ErrInternalError, SSEResponseMetadata{} } // Update variables based on the result @@ -257,121 +277,357 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader sseS3Key = encryptionResult.SSES3Key sseKMSKey = encryptionResult.SSEKMSKey + // If bucket-default encryption selected an algorithm, reflect it in SSE type + if sseType == "" { + if sseS3Key != nil { + sseType = s3_constants.SSETypeS3 + } else if sseKMSKey != nil { + sseType = s3_constants.SSETypeKMS + } + } + // If SSE-S3 was applied by bucket default, prepare metadata (if not already done) if sseS3Key != nil && len(sseS3Metadata) == 0 { var metaErr error sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key) if metaErr != nil { glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr) - return "", s3err.ErrInternalError, "" + return "", s3err.ErrInternalError, SSEResponseMetadata{} } } } else { glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") } - hash := md5.New() - var body = io.TeeReader(dataReader, hash) + // Parse the upload URL to extract the file path + // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.) + // Use proper URL parsing instead of string manipulation for robustness + parsedUrl, parseErr := url.Parse(uploadUrl) + if parseErr != nil { + glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr) + return "", s3err.ErrInternalError, SSEResponseMetadata{} + } + + // Use parsedUrl.Path directly - it's already decoded by url.Parse() + // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/" + // Calling PathUnescape again would double-decode and fail on keys like "b%ar" + filePath := parsedUrl.Path - proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) + // Step 1 & 2: Use auto-chunking to handle large files without OOM + // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers + const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard) + const smallFileLimit = 256 * 1024 // 256KB - store inline in filer + collection := "" + if s3a.option.FilerGroup != "" { + collection = s3a.getCollectionName(bucket) + } + + // Create assign function for chunked upload + assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) { + var assignResult *filer_pb.AssignVolumeResponse + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{ + Count: int32(count), + Replication: "", + Collection: collection, + DiskType: "", + DataCenter: s3a.option.DataCenter, + Path: filePath, + }) + if err != nil { + return fmt.Errorf("assign volume: %w", err) + } + if resp.Error != "" { + return fmt.Errorf("assign volume: %v", resp.Error) + } + assignResult = resp + return nil + }) + if err != nil { + return nil, nil, err + } + + // Convert filer_pb.AssignVolumeResponse to operation.AssignResult + return nil, &operation.AssignResult{ + Fid: assignResult.FileId, + Url: assignResult.Location.Url, + PublicUrl: assignResult.Location.PublicUrl, + Count: uint64(count), + Auth: security.EncodedJwt(assignResult.Auth), + }, nil + } + + // Upload with auto-chunking + // Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled + // This prevents partial uploads and data corruption + chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{ + ChunkSize: chunkSize, + SmallFileLimit: smallFileLimit, + Collection: collection, + DataCenter: s3a.option.DataCenter, + SaveSmallInline: false, // S3 API always creates chunks, never stores inline + MimeType: r.Header.Get("Content-Type"), + AssignFunc: assignFunc, + }) if err != nil { - glog.Errorf("NewRequest %s: %v", uploadUrl, err) - return "", s3err.ErrInternalError, "" - } + glog.Errorf("putToFiler: chunked upload failed: %v", err) + + // CRITICAL: Cleanup orphaned chunks before returning error + // UploadReaderInChunks now returns partial results even on error, + // allowing us to cleanup any chunks that were successfully uploaded + // before the failure occurred + if chunkResult != nil && len(chunkResult.FileChunks) > 0 { + glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + } - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - if destination != "" { - proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination) + if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { + return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} + } + return "", s3err.ErrInternalError, SSEResponseMetadata{} } - if s3a.option.FilerGroup != "" { - query := proxyReq.URL.Query() - query.Add("collection", s3a.getCollectionName(bucket)) - proxyReq.URL.RawQuery = query.Encode() - } + // Step 3: Calculate MD5 hash and add SSE metadata to chunks + md5Sum := chunkResult.Md5Hash.Sum(nil) - for header, values := range r.Header { - for _, value := range values { - proxyReq.Header.Add(header, value) + glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d", + filePath, len(chunkResult.FileChunks), chunkResult.TotalSize) + + // Log chunk details for debugging (verbose only - high frequency) + if glog.V(4) { + for i, chunk := range chunkResult.FileChunks { + glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) } } - // Log version ID header for debugging - if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { - glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl) + // Add SSE metadata to all chunks if present + for _, chunk := range chunkResult.FileChunks { + switch { + case customerKey != nil: + // SSE-C: Create per-chunk metadata (matches filer logic) + chunk.SseType = filer_pb.SSEType_SSE_C + if len(sseIV) > 0 { + // PartOffset tracks position within the encrypted stream + // Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0, + // PartOffset = chunk.Offset represents where this chunk is in that encrypted stream + // - Single-part: chunk.Offset is position in the file's encrypted stream + // - Multipart: chunk.Offset is position in this part's encrypted stream + ssecMetadataStruct := struct { + Algorithm string `json:"algorithm"` + IV string `json:"iv"` + KeyMD5 string `json:"keyMD5"` + PartOffset int64 `json:"partOffset"` + }{ + Algorithm: "AES256", + IV: base64.StdEncoding.EncodeToString(sseIV), + KeyMD5: customerKey.KeyMD5, + PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0) + } + if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil { + chunk.SseMetadata = ssecMetadata + } + } + case sseKMSKey != nil: + // SSE-KMS: Create per-chunk metadata with chunk-specific offsets + // Each chunk needs its own metadata with ChunkOffset set for proper IV calculation during decryption + chunk.SseType = filer_pb.SSEType_SSE_KMS + + // Create a copy of the SSE-KMS key with chunk-specific offset + chunkSSEKey := &SSEKMSKey{ + KeyID: sseKMSKey.KeyID, + EncryptedDataKey: sseKMSKey.EncryptedDataKey, + EncryptionContext: sseKMSKey.EncryptionContext, + BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, + IV: sseKMSKey.IV, + ChunkOffset: chunk.Offset, // Set chunk-specific offset for IV calculation + } + + // Serialize per-chunk metadata + if chunkMetadata, serErr := SerializeSSEKMSMetadata(chunkSSEKey); serErr == nil { + chunk.SseMetadata = chunkMetadata + } else { + glog.Errorf("Failed to serialize SSE-KMS metadata for chunk at offset %d: %v", chunk.Offset, serErr) + } + case sseS3Key != nil: + // SSE-S3: Create per-chunk metadata with chunk-specific IVs + // Each chunk needs its own IV calculated from the base IV + chunk offset + chunk.SseType = filer_pb.SSEType_SSE_S3 + + // Calculate chunk-specific IV using base IV and chunk offset + chunkIV, _ := calculateIVWithOffset(sseS3Key.IV, chunk.Offset) + + // Create a copy of the SSE-S3 key with chunk-specific IV + chunkSSEKey := &SSES3Key{ + Key: sseS3Key.Key, + KeyID: sseS3Key.KeyID, + Algorithm: sseS3Key.Algorithm, + IV: chunkIV, // Use chunk-specific IV + } + + // Serialize per-chunk metadata + if chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey); serErr == nil { + chunk.SseMetadata = chunkMetadata + } else { + glog.Errorf("Failed to serialize SSE-S3 metadata for chunk at offset %d: %v", chunk.Offset, serErr) + } + } } - // Set object owner header for filer to extract + // Step 4: Create metadata entry + now := time.Now() + mimeType := r.Header.Get("Content-Type") + if mimeType == "" { + mimeType = "application/octet-stream" + } + + // Create entry + entry := &filer_pb.Entry{ + Name: filepath.Base(filePath), + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: now.Unix(), + Mtime: now.Unix(), + FileMode: 0660, + Uid: 0, + Gid: 0, + Mime: mimeType, + FileSize: uint64(chunkResult.TotalSize), + }, + Chunks: chunkResult.FileChunks, // All chunks from auto-chunking + Extended: make(map[string][]byte), + } + + // Set Md5 attribute based on context: + // 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5 + // - Parts must use simple MD5 ETags, never composite format + // - Even if a part has multiple chunks internally, its ETag is MD5 of entire part + // 2. For regular object uploads: only set Md5 for single-chunk uploads + // - Multi-chunk regular objects use composite "md5-count" format + isMultipartPart := strings.Contains(filePath, "/"+s3_constants.MultipartUploadsFolder+"/") + if isMultipartPart || len(chunkResult.FileChunks) == 1 { + entry.Attributes.Md5 = md5Sum + } + + // Calculate ETag using the same logic as GET to ensure consistency + // For single chunk: uses entry.Attributes.Md5 + // For multiple chunks: uses filer.ETagChunks() which returns "<hash>-<count>" + etag = filer.ETag(entry) + glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks)) + + // Set object owner amzAccountId := r.Header.Get(s3_constants.AmzAccountId) if amzAccountId != "" { - proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId) - glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl) + entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId) + glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath) + } + + // Set version ID if present + if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" { + entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader) + glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath) + } + + // Set TTL-based S3 expiry flag only if object has a TTL + if entry.Attributes.TtlSec > 0 { + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + } + + // Copy user metadata and standard headers + for k, v := range r.Header { + if len(v) > 0 && len(v[0]) > 0 { + if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) { + // Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo) + // We store them as they come in (after canonicalization) to preserve the user's intent + entry.Extended[k] = []byte(v[0]) + } else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { + entry.Extended[k] = []byte(v[0]) + } + if k == "Response-Content-Disposition" { + entry.Extended["Content-Disposition"] = []byte(v[0]) + } + } } - // Set SSE-C metadata headers for the filer if encryption was applied + // Set SSE-C metadata if customerKey != nil && len(sseIV) > 0 { - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256") - proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5) - // Store IV in a custom header that the filer can use to store in entry metadata - proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV)) + // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256") + entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5) + glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV)) } - // Set SSE-KMS metadata headers for the filer if KMS encryption was applied + // Set SSE-KMS metadata if sseKMSKey != nil { - // Use already-serialized SSE-KMS metadata from helper function - // Store serialized KMS metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata)) - - glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID) - } else { - glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected") + // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata + // Set standard SSE headers for detection + entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms") + entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID) + glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata)) } - // Set SSE-S3 metadata headers for the filer if S3 encryption was applied + // Set SSE-S3 metadata if sseS3Key != nil && len(sseS3Metadata) > 0 { - // Store serialized S3 metadata in a custom header that the filer can use - proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata)) - glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID) - } - // Set TTL-based S3 expiry (modification time) - proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true") - // ensure that the Authorization header is overriding any previous - // Authorization header which might be already present in proxyReq - s3a.maybeAddFilerJwtAuthorization(proxyReq, true) - resp, postErr := s3a.client.Do(proxyReq) - - if postErr != nil { - glog.Errorf("post to filer: %v", postErr) - if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) { - return "", s3err.ErrInvalidDigest, "" + // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) + entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata + // Set standard SSE header for detection + entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256") + glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata)) + } + + // Step 4: Save metadata to filer via gRPC + // Use context.Background() to ensure metadata save completes even if HTTP request is cancelled + // This matches the chunk upload behavior and prevents orphaned chunks + glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d", + filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended)) + createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + req := &filer_pb.CreateEntryRequest{ + Directory: filepath.Dir(filePath), + Entry: entry, + } + glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath) + _, err := client.CreateEntry(context.Background(), req) + if err != nil { + glog.Errorf("putToFiler: CreateEntry returned error: %v", err) } - return "", s3err.ErrInternalError, "" + return err + }) + if createErr != nil { + glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr) + + // CRITICAL: Cleanup orphaned chunks before returning error + // If CreateEntry fails, the uploaded chunks are orphaned and must be deleted + // to prevent resource leaks and wasted storage + if len(chunkResult.FileChunks) > 0 { + glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + } + + return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{} } - defer resp.Body.Close() + glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath) - etag = fmt.Sprintf("%x", hash.Sum(nil)) + glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d", + filePath, etag, entry.Attributes.FileSize, partNumber) - resp_body, ra_err := io.ReadAll(resp.Body) - if ra_err != nil { - glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err) - return etag, s3err.ErrInternalError, "" - } - var ret weed_server.FilerPostResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) - return "", s3err.ErrInternalError, "" - } - if ret.Error != "" { - glog.Errorf("upload to filer error: %v", ret.Error) - return "", filerErrorToS3Error(ret.Error), "" + BucketTrafficReceived(chunkResult.TotalSize, r) + + // Build SSE response metadata with encryption details + responseMetadata := SSEResponseMetadata{ + SSEType: sseType, } - BucketTrafficReceived(ret.Size, r) + // For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata + if sseKMSKey != nil { + responseMetadata.KMSKeyID = sseKMSKey.KeyID + responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled + glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v", + sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled) + } - // Return the SSE type determined by the unified handler - return etag, s3err.ErrNone, sseResult.SSEType + return etag, s3err.ErrNone, responseMetadata } func setEtag(w http.ResponseWriter, etag string) { @@ -384,6 +640,43 @@ func setEtag(w http.ResponseWriter, etag string) { } } +// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type +func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) { + switch sseMetadata.SSEType { + case s3_constants.SSETypeS3: + // SSE-S3: Return the encryption algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + + case s3_constants.SSETypeC: + // SSE-C: Echo back the customer-provided algorithm and key MD5 + if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo) + } + if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return the KMS key ID and algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + + // Use metadata from stored encryption config (for bucket-default encryption) + // or fall back to request headers (for explicit encryption) + if sseMetadata.KMSKeyID != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID) + } else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" { + w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID) + } + + // Set bucket-key-enabled header if it was enabled + if sseMetadata.BucketKeyEnabled { + w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" { + w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true") + } + } +} + func filerErrorToS3Error(errString string) s3err.ErrorCode { switch { case errString == constants.ErrMsgBadDigest: @@ -400,26 +693,6 @@ func filerErrorToS3Error(errString string) s3err.ErrorCode { } } -func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) { - encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite) - - if encodedJwt == "" { - return - } - - r.Header.Set("Authorization", "BEARER "+string(encodedJwt)) -} - -func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string { - var encodedJwt security.EncodedJwt - if isWrite { - encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec) - } else { - encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec) - } - return string(encodedJwt) -} - // setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) { amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -446,19 +719,12 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ // // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. -func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // Normalize object path to ensure consistency with toFilerUrl behavior normalizedObject := removeDuplicateSlashes(object) - // Enable detailed logging for testobjbar - isTestObj := (normalizedObject == "testobjbar") - - glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v", - bucket, object, normalizedObject, isTestObj) - - if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===") - } + glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s", + bucket, object, normalizedObject) bucketDir := s3a.option.BucketsPath + "/" + bucket @@ -470,20 +736,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob entries, _, err := s3a.list(versionsDir, "", "", false, 1000) if err == nil { // .versions directory exists - glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) + glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object) for _, entry := range entries { if entry.Extended != nil { if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok { versionId := string(versionIdBytes) - glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) + glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId) if versionId == "null" { // Only delete null version - preserve real versioned entries - glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions") + glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions") err := s3a.rm(versionsDir, entry.Name, true, false) if err != nil { glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err) } else { - glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version") + glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version") } break } @@ -491,13 +757,12 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } } } else { - glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) + glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) } uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) - hash := md5.New() - var body = io.TeeReader(dataReader, hash) + body := dataReader if objectContentType == "" { body = mimeDetect(r, body) } @@ -508,10 +773,6 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob // Set version ID to "null" for suspended versioning r.Header.Set(s3_constants.ExtVersionIdKey, "null") - if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===", - s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey)) - } // Extract and set object lock metadata as headers // This handles retention mode, retention date, and legal hold @@ -528,7 +789,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate) if err != nil { glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10)) glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix()) @@ -540,7 +801,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold) } else { glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold) - return "", s3err.ErrInvalidRequest + return "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } } @@ -562,43 +823,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } // Upload the file using putToFiler - this will create the file with version metadata - if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===") - } - etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) - return "", errCode - } - if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag) - } - - // Verify the metadata was set correctly during file creation - if isTestObj { - // Read back the entry to verify - maxRetries := 3 - for attempt := 1; attempt <= maxRetries; attempt++ { - verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject) - if verifyErr == nil { - glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended) - if verifyEntry.Extended != nil { - if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok { - glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes)) - } else { - glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===") - } - } else { - glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===") - } - break - } else { - glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr) - } - if attempt < maxRetries { - time.Sleep(time.Millisecond * 10) - } - } + return "", errCode, SSEResponseMetadata{} } // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest @@ -609,10 +837,8 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) - if isTestObj { - glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===") - } - return etag, s3err.ErrNone + + return etag, s3err.ErrNone, sseMetadata } // updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers @@ -684,7 +910,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object return nil } -func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) { +func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // Generate version ID versionId = generateVersionId() @@ -709,21 +935,20 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } - hash := md5.New() - var body = io.TeeReader(dataReader, hash) + body := dataReader if objectContentType == "" { body = mimeDetect(r, body) } glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) - etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) - return "", "", errCode + return "", "", errCode, SSEResponseMetadata{} } // Get the uploaded entry to add versioning metadata @@ -745,7 +970,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin if err != nil { glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } // Add versioning metadata to this version @@ -766,7 +991,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Extract and store object lock metadata from request headers if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil { glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err) - return "", "", s3err.ErrInvalidRequest + return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{} } // Update the version entry with metadata @@ -777,17 +1002,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin }) if err != nil { glog.Errorf("putVersionedObject: failed to update version metadata: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } // Update the .versions directory metadata to indicate this is the latest version err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName) if err != nil { glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err) - return "", "", s3err.ErrInternalError + return "", "", s3err.ErrInternalError, SSEResponseMetadata{} } glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) - return versionId, etag, s3err.ErrNone + return versionId, etag, s3err.ErrNone, sseMetadata } // updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version @@ -897,7 +1122,16 @@ func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, en func (s3a *S3ApiServer) applyBucketDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader) (*BucketDefaultEncryptionResult, error) { // Check if bucket has default encryption configured encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket) - if err != nil || encryptionConfig == nil { + if err != nil { + // Check if this is just "no encryption configured" vs a real error + if errors.Is(err, ErrNoEncryptionConfig) { + // No default encryption configured, return original reader + return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil + } + // Real error - propagate to prevent silent encryption bypass + return nil, fmt.Errorf("failed to read bucket encryption config: %v", err) + } + if encryptionConfig == nil { // No default encryption configured, return original reader return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil } @@ -963,7 +1197,8 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ bucketKeyEnabled := encryptionConfig.BucketKeyEnabled // Build encryption context for KMS - bucket, object := s3_constants.GetBucketAndObject(r) + // Use bucket parameter passed to function (not from request parsing) + _, object := s3_constants.GetBucketAndObject(r) encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled) // Create SSE-KMS encrypted reader @@ -1474,3 +1709,88 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult { return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object) } + +// deleteOrphanedChunks attempts to delete chunks that were uploaded but whose entry creation failed +// This prevents resource leaks and wasted storage. Errors are logged but don't prevent cleanup attempts. +func (s3a *S3ApiServer) deleteOrphanedChunks(chunks []*filer_pb.FileChunk) { + if len(chunks) == 0 { + return + } + + // Extract file IDs from chunks + var fileIds []string + for _, chunk := range chunks { + if chunk.GetFileIdString() != "" { + fileIds = append(fileIds, chunk.GetFileIdString()) + } + } + + if len(fileIds) == 0 { + glog.Warningf("deleteOrphanedChunks: no valid file IDs found in %d chunks", len(chunks)) + return + } + + glog.V(3).Infof("deleteOrphanedChunks: attempting to delete %d file IDs: %v", len(fileIds), fileIds) + + // Create a lookup function that queries the filer for volume locations + // This is similar to createLookupFileIdFunction but returns the format needed by DeleteFileIdsWithLookupVolumeId + lookupFunc := func(vids []string) (map[string]*operation.LookupResult, error) { + results := make(map[string]*operation.LookupResult) + + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Query filer for all volume IDs at once + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return err + } + + // Convert filer response to operation.LookupResult format + for vid, locs := range resp.LocationsMap { + result := &operation.LookupResult{ + VolumeOrFileId: vid, + } + + for _, loc := range locs.Locations { + result.Locations = append(result.Locations, operation.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + DataCenter: loc.DataCenter, + GrpcPort: int(loc.GrpcPort), + }) + } + + results[vid] = result + } + return nil + }) + + return results, err + } + + // Attempt deletion using the operation package's batch delete with custom lookup + deleteResults := operation.DeleteFileIdsWithLookupVolumeId(s3a.option.GrpcDialOption, fileIds, lookupFunc) + + // Log results - track successes and failures + successCount := 0 + failureCount := 0 + for _, result := range deleteResults { + if result.Error != "" { + glog.Warningf("deleteOrphanedChunks: failed to delete chunk %s: %s (status: %d)", + result.FileId, result.Error, result.Status) + failureCount++ + } else { + glog.V(4).Infof("deleteOrphanedChunks: successfully deleted chunk %s (size: %d bytes)", + result.FileId, result.Size) + successCount++ + } + } + + if failureCount > 0 { + glog.Warningf("deleteOrphanedChunks: cleanup completed with %d successes and %d failures out of %d chunks", + successCount, failureCount, len(fileIds)) + } else { + glog.V(3).Infof("deleteOrphanedChunks: successfully deleted all %d orphaned chunks", successCount) + } +} |
