aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_put.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_put.go')
-rw-r--r--weed/s3api/s3api_object_handlers_put.go694
1 files changed, 507 insertions, 187 deletions
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 6ce48429f..f7105052e 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -1,25 +1,28 @@
package s3api
import (
- "crypto/md5"
+ "context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
+ "net/url"
+ "path/filepath"
"strconv"
"strings"
"time"
"github.com/pquerna/cachecontrol/cacheobject"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
- weed_server "github.com/seaweedfs/seaweedfs/weed/server"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util/constants"
)
@@ -60,6 +63,13 @@ type BucketDefaultEncryptionResult struct {
SSEKMSKey *SSEKMSKey
}
+// SSEResponseMetadata holds encryption metadata needed for HTTP response headers
+type SSEResponseMetadata struct {
+ SSEType string
+ KMSKeyID string
+ BucketKeyEnabled bool
+}
+
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
@@ -135,7 +145,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
versioningEnabled := (versioningState == s3_constants.VersioningEnabled)
versioningConfigured := (versioningState != "")
- glog.V(2).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
+ glog.V(3).Infof("PutObjectHandler: bucket=%s, object=%s, versioningState='%s', versioningEnabled=%v, versioningConfigured=%v", bucket, object, versioningState, versioningEnabled, versioningConfigured)
// Validate object lock headers before processing
if err := s3a.validateObjectLockHeaders(r, versioningEnabled); err != nil {
@@ -158,29 +168,34 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
switch versioningState {
case s3_constants.VersioningEnabled:
// Handle enabled versioning - create new versions with real version IDs
- glog.V(0).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
- versionId, etag, errCode := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
+ glog.V(3).Infof("PutObjectHandler: ENABLED versioning detected for %s/%s, calling putVersionedObject", bucket, object)
+ versionId, etag, errCode, sseMetadata := s3a.putVersionedObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectHandler: putVersionedObject failed with errCode=%v for %s/%s", errCode, bucket, object)
s3err.WriteErrorResponse(w, r, errCode)
return
}
- glog.V(0).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
+ glog.V(3).Infof("PutObjectHandler: putVersionedObject returned versionId=%s, etag=%s for %s/%s", versionId, etag, bucket, object)
// Set version ID in response header
if versionId != "" {
w.Header().Set("x-amz-version-id", versionId)
- glog.V(0).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
+ glog.V(3).Infof("PutObjectHandler: set x-amz-version-id header to %s for %s/%s", versionId, bucket, object)
} else {
glog.Errorf("PutObjectHandler: CRITICAL - versionId is EMPTY for versioned bucket %s, object %s", bucket, object)
}
// Set ETag in response
setEtag(w, etag)
+
+ // Set SSE response headers for versioned objects
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
+
case s3_constants.VersioningSuspended:
// Handle suspended versioning - overwrite with "null" version ID but preserve existing versions
- etag, errCode := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
+ glog.V(3).Infof("PutObjectHandler: SUSPENDED versioning detected for %s/%s, calling putSuspendedVersioningObject", bucket, object)
+ etag, errCode, sseMetadata := s3a.putSuspendedVersioningObject(r, bucket, object, dataReader, objectContentType)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
@@ -191,6 +206,9 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
// Set ETag in response
setEtag(w, etag)
+
+ // Set SSE response headers for suspended versioning
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
default:
// Handle regular PUT (never configured versioning)
uploadUrl := s3a.toFilerUrl(bucket, object)
@@ -198,7 +216,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
dataReader = mimeDetect(r, dataReader)
}
- etag, errCode, sseType := s3a.putToFiler(r, uploadUrl, dataReader, "", bucket, 1)
+ etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@@ -209,9 +227,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
// Set SSE response headers based on encryption type used
- if sseType == s3_constants.SSETypeS3 {
- w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
- }
+ s3a.setSSEResponseHeaders(w, r, sseMetadata)
}
}
stats_collect.RecordBucketActiveTime(bucket)
@@ -220,15 +236,18 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r)
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, destination string, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseType string) {
- // Calculate unique offset for each part to prevent IV reuse in multipart uploads
- // This is critical for CTR mode encryption security
- partOffset := calculatePartOffset(partNumber)
+func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
+ // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
+ // This eliminates the filer proxy overhead for PUT operations
+
+ // For SSE, encrypt with offset=0 for all parts
+ // Each part is encrypted independently, then decrypted using metadata during GET
+ partOffset := int64(0)
- // Handle all SSE encryption types in a unified manner to eliminate repetitive dataReader assignments
+ // Handle all SSE encryption types in a unified manner
sseResult, sseErrorCode := s3a.handleAllSSEEncryption(r, dataReader, partOffset)
if sseErrorCode != s3err.ErrNone {
- return "", sseErrorCode, ""
+ return "", sseErrorCode, SSEResponseMetadata{}
}
// Extract results from unified SSE handling
@@ -239,6 +258,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseKMSMetadata := sseResult.SSEKMSMetadata
sseS3Key := sseResult.SSES3Key
sseS3Metadata := sseResult.SSES3Metadata
+ sseType := sseResult.SSEType
// Apply bucket default encryption if no explicit encryption was provided
// This implements AWS S3 behavior where bucket default encryption automatically applies
@@ -249,7 +269,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
encryptionResult, applyErr := s3a.applyBucketDefaultEncryption(bucket, r, dataReader)
if applyErr != nil {
glog.Errorf("Failed to apply bucket default encryption: %v", applyErr)
- return "", s3err.ErrInternalError, ""
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update variables based on the result
@@ -257,121 +277,357 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
sseS3Key = encryptionResult.SSES3Key
sseKMSKey = encryptionResult.SSEKMSKey
+ // If bucket-default encryption selected an algorithm, reflect it in SSE type
+ if sseType == "" {
+ if sseS3Key != nil {
+ sseType = s3_constants.SSETypeS3
+ } else if sseKMSKey != nil {
+ sseType = s3_constants.SSETypeKMS
+ }
+ }
+
// If SSE-S3 was applied by bucket default, prepare metadata (if not already done)
if sseS3Key != nil && len(sseS3Metadata) == 0 {
var metaErr error
sseS3Metadata, metaErr = SerializeSSES3Metadata(sseS3Key)
if metaErr != nil {
glog.Errorf("Failed to serialize SSE-S3 metadata for bucket default encryption: %v", metaErr)
- return "", s3err.ErrInternalError, ""
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
}
} else {
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
}
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ // Parse the upload URL to extract the file path
+ // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.)
+ // Use proper URL parsing instead of string manipulation for robustness
+ parsedUrl, parseErr := url.Parse(uploadUrl)
+ if parseErr != nil {
+ glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr)
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
+ }
+
+ // Use parsedUrl.Path directly - it's already decoded by url.Parse()
+ // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/"
+ // Calling PathUnescape again would double-decode and fail on keys like "b%ar"
+ filePath := parsedUrl.Path
- proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body)
+ // Step 1 & 2: Use auto-chunking to handle large files without OOM
+ // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
+ const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
+ const smallFileLimit = 256 * 1024 // 256KB - store inline in filer
+ collection := ""
+ if s3a.option.FilerGroup != "" {
+ collection = s3a.getCollectionName(bucket)
+ }
+
+ // Create assign function for chunked upload
+ assignFunc := func(ctx context.Context, count int) (*operation.VolumeAssignRequest, *operation.AssignResult, error) {
+ var assignResult *filer_pb.AssignVolumeResponse
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.AssignVolume(ctx, &filer_pb.AssignVolumeRequest{
+ Count: int32(count),
+ Replication: "",
+ Collection: collection,
+ DiskType: "",
+ DataCenter: s3a.option.DataCenter,
+ Path: filePath,
+ })
+ if err != nil {
+ return fmt.Errorf("assign volume: %w", err)
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume: %v", resp.Error)
+ }
+ assignResult = resp
+ return nil
+ })
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // Convert filer_pb.AssignVolumeResponse to operation.AssignResult
+ return nil, &operation.AssignResult{
+ Fid: assignResult.FileId,
+ Url: assignResult.Location.Url,
+ PublicUrl: assignResult.Location.PublicUrl,
+ Count: uint64(count),
+ Auth: security.EncodedJwt(assignResult.Auth),
+ }, nil
+ }
+
+ // Upload with auto-chunking
+ // Use context.Background() to ensure chunk uploads complete even if HTTP request is cancelled
+ // This prevents partial uploads and data corruption
+ chunkResult, err := operation.UploadReaderInChunks(context.Background(), dataReader, &operation.ChunkedUploadOption{
+ ChunkSize: chunkSize,
+ SmallFileLimit: smallFileLimit,
+ Collection: collection,
+ DataCenter: s3a.option.DataCenter,
+ SaveSmallInline: false, // S3 API always creates chunks, never stores inline
+ MimeType: r.Header.Get("Content-Type"),
+ AssignFunc: assignFunc,
+ })
if err != nil {
- glog.Errorf("NewRequest %s: %v", uploadUrl, err)
- return "", s3err.ErrInternalError, ""
- }
+ glog.Errorf("putToFiler: chunked upload failed: %v", err)
+
+ // CRITICAL: Cleanup orphaned chunks before returning error
+ // UploadReaderInChunks now returns partial results even on error,
+ // allowing us to cleanup any chunks that were successfully uploaded
+ // before the failure occurred
+ if chunkResult != nil && len(chunkResult.FileChunks) > 0 {
+ glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
+ s3a.deleteOrphanedChunks(chunkResult.FileChunks)
+ }
- proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- if destination != "" {
- proxyReq.Header.Set(s3_constants.SeaweedStorageDestinationHeader, destination)
+ if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
+ return "", s3err.ErrInvalidDigest, SSEResponseMetadata{}
+ }
+ return "", s3err.ErrInternalError, SSEResponseMetadata{}
}
- if s3a.option.FilerGroup != "" {
- query := proxyReq.URL.Query()
- query.Add("collection", s3a.getCollectionName(bucket))
- proxyReq.URL.RawQuery = query.Encode()
- }
+ // Step 3: Calculate MD5 hash and add SSE metadata to chunks
+ md5Sum := chunkResult.Md5Hash.Sum(nil)
- for header, values := range r.Header {
- for _, value := range values {
- proxyReq.Header.Add(header, value)
+ glog.V(4).Infof("putToFiler: Chunked upload SUCCESS - path=%s, chunks=%d, size=%d",
+ filePath, len(chunkResult.FileChunks), chunkResult.TotalSize)
+
+ // Log chunk details for debugging (verbose only - high frequency)
+ if glog.V(4) {
+ for i, chunk := range chunkResult.FileChunks {
+ glog.Infof(" PUT Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size)
}
}
- // Log version ID header for debugging
- if versionIdHeader := proxyReq.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
- glog.V(0).Infof("putToFiler: version ID header set: %s=%s for %s", s3_constants.ExtVersionIdKey, versionIdHeader, uploadUrl)
+ // Add SSE metadata to all chunks if present
+ for _, chunk := range chunkResult.FileChunks {
+ switch {
+ case customerKey != nil:
+ // SSE-C: Create per-chunk metadata (matches filer logic)
+ chunk.SseType = filer_pb.SSEType_SSE_C
+ if len(sseIV) > 0 {
+ // PartOffset tracks position within the encrypted stream
+ // Since ALL uploads (single-part and multipart parts) encrypt starting from offset 0,
+ // PartOffset = chunk.Offset represents where this chunk is in that encrypted stream
+ // - Single-part: chunk.Offset is position in the file's encrypted stream
+ // - Multipart: chunk.Offset is position in this part's encrypted stream
+ ssecMetadataStruct := struct {
+ Algorithm string `json:"algorithm"`
+ IV string `json:"iv"`
+ KeyMD5 string `json:"keyMD5"`
+ PartOffset int64 `json:"partOffset"`
+ }{
+ Algorithm: "AES256",
+ IV: base64.StdEncoding.EncodeToString(sseIV),
+ KeyMD5: customerKey.KeyMD5,
+ PartOffset: chunk.Offset, // Position within the encrypted stream (always encrypted from 0)
+ }
+ if ssecMetadata, serErr := json.Marshal(ssecMetadataStruct); serErr == nil {
+ chunk.SseMetadata = ssecMetadata
+ }
+ }
+ case sseKMSKey != nil:
+ // SSE-KMS: Create per-chunk metadata with chunk-specific offsets
+ // Each chunk needs its own metadata with ChunkOffset set for proper IV calculation during decryption
+ chunk.SseType = filer_pb.SSEType_SSE_KMS
+
+ // Create a copy of the SSE-KMS key with chunk-specific offset
+ chunkSSEKey := &SSEKMSKey{
+ KeyID: sseKMSKey.KeyID,
+ EncryptedDataKey: sseKMSKey.EncryptedDataKey,
+ EncryptionContext: sseKMSKey.EncryptionContext,
+ BucketKeyEnabled: sseKMSKey.BucketKeyEnabled,
+ IV: sseKMSKey.IV,
+ ChunkOffset: chunk.Offset, // Set chunk-specific offset for IV calculation
+ }
+
+ // Serialize per-chunk metadata
+ if chunkMetadata, serErr := SerializeSSEKMSMetadata(chunkSSEKey); serErr == nil {
+ chunk.SseMetadata = chunkMetadata
+ } else {
+ glog.Errorf("Failed to serialize SSE-KMS metadata for chunk at offset %d: %v", chunk.Offset, serErr)
+ }
+ case sseS3Key != nil:
+ // SSE-S3: Create per-chunk metadata with chunk-specific IVs
+ // Each chunk needs its own IV calculated from the base IV + chunk offset
+ chunk.SseType = filer_pb.SSEType_SSE_S3
+
+ // Calculate chunk-specific IV using base IV and chunk offset
+ chunkIV, _ := calculateIVWithOffset(sseS3Key.IV, chunk.Offset)
+
+ // Create a copy of the SSE-S3 key with chunk-specific IV
+ chunkSSEKey := &SSES3Key{
+ Key: sseS3Key.Key,
+ KeyID: sseS3Key.KeyID,
+ Algorithm: sseS3Key.Algorithm,
+ IV: chunkIV, // Use chunk-specific IV
+ }
+
+ // Serialize per-chunk metadata
+ if chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey); serErr == nil {
+ chunk.SseMetadata = chunkMetadata
+ } else {
+ glog.Errorf("Failed to serialize SSE-S3 metadata for chunk at offset %d: %v", chunk.Offset, serErr)
+ }
+ }
}
- // Set object owner header for filer to extract
+ // Step 4: Create metadata entry
+ now := time.Now()
+ mimeType := r.Header.Get("Content-Type")
+ if mimeType == "" {
+ mimeType = "application/octet-stream"
+ }
+
+ // Create entry
+ entry := &filer_pb.Entry{
+ Name: filepath.Base(filePath),
+ IsDirectory: false,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: now.Unix(),
+ Mtime: now.Unix(),
+ FileMode: 0660,
+ Uid: 0,
+ Gid: 0,
+ Mime: mimeType,
+ FileSize: uint64(chunkResult.TotalSize),
+ },
+ Chunks: chunkResult.FileChunks, // All chunks from auto-chunking
+ Extended: make(map[string][]byte),
+ }
+
+ // Set Md5 attribute based on context:
+ // 1. For multipart upload PARTS (stored in .uploads/ directory): ALWAYS set Md5
+ // - Parts must use simple MD5 ETags, never composite format
+ // - Even if a part has multiple chunks internally, its ETag is MD5 of entire part
+ // 2. For regular object uploads: only set Md5 for single-chunk uploads
+ // - Multi-chunk regular objects use composite "md5-count" format
+ isMultipartPart := strings.Contains(filePath, "/"+s3_constants.MultipartUploadsFolder+"/")
+ if isMultipartPart || len(chunkResult.FileChunks) == 1 {
+ entry.Attributes.Md5 = md5Sum
+ }
+
+ // Calculate ETag using the same logic as GET to ensure consistency
+ // For single chunk: uses entry.Attributes.Md5
+ // For multiple chunks: uses filer.ETagChunks() which returns "<hash>-<count>"
+ etag = filer.ETag(entry)
+ glog.V(4).Infof("putToFiler: Calculated ETag=%s for %d chunks", etag, len(chunkResult.FileChunks))
+
+ // Set object owner
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
- proxyReq.Header.Set(s3_constants.ExtAmzOwnerKey, amzAccountId)
- glog.V(2).Infof("putToFiler: setting owner header %s for object %s", amzAccountId, uploadUrl)
+ entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
+ glog.V(2).Infof("putToFiler: setting owner %s for object %s", amzAccountId, filePath)
+ }
+
+ // Set version ID if present
+ if versionIdHeader := r.Header.Get(s3_constants.ExtVersionIdKey); versionIdHeader != "" {
+ entry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionIdHeader)
+ glog.V(3).Infof("putToFiler: setting version ID %s for object %s", versionIdHeader, filePath)
+ }
+
+ // Set TTL-based S3 expiry flag only if object has a TTL
+ if entry.Attributes.TtlSec > 0 {
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ }
+
+ // Copy user metadata and standard headers
+ for k, v := range r.Header {
+ if len(v) > 0 && len(v[0]) > 0 {
+ if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
+ // Go's HTTP server canonicalizes headers (e.g., x-amz-meta-foo → X-Amz-Meta-Foo)
+ // We store them as they come in (after canonicalization) to preserve the user's intent
+ entry.Extended[k] = []byte(v[0])
+ } else if k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
+ entry.Extended[k] = []byte(v[0])
+ }
+ if k == "Response-Content-Disposition" {
+ entry.Extended["Content-Disposition"] = []byte(v[0])
+ }
+ }
}
- // Set SSE-C metadata headers for the filer if encryption was applied
+ // Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 {
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, "AES256")
- proxyReq.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, customerKey.KeyMD5)
- // Store IV in a custom header that the filer can use to store in entry metadata
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEIVHeader, base64.StdEncoding.EncodeToString(sseIV))
+ // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSEIV] = sseIV
+ entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
+ entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(customerKey.KeyMD5)
+ glog.V(3).Infof("putToFiler: storing SSE-C metadata - IV len=%d", len(sseIV))
}
- // Set SSE-KMS metadata headers for the filer if KMS encryption was applied
+ // Set SSE-KMS metadata
if sseKMSKey != nil {
- // Use already-serialized SSE-KMS metadata from helper function
- // Store serialized KMS metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSEKMSKeyHeader, base64.StdEncoding.EncodeToString(sseKMSMetadata))
-
- glog.V(3).Infof("putToFiler: storing SSE-KMS metadata for object %s with keyID %s", uploadUrl, sseKMSKey.KeyID)
- } else {
- glog.V(4).Infof("putToFiler: no SSE-KMS encryption detected")
+ // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = sseKMSMetadata
+ // Set standard SSE headers for detection
+ entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("aws:kms")
+ entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId] = []byte(sseKMSKey.KeyID)
+ glog.V(3).Infof("putToFiler: storing SSE-KMS metadata - keyID=%s, raw len=%d", sseKMSKey.KeyID, len(sseKMSMetadata))
}
- // Set SSE-S3 metadata headers for the filer if S3 encryption was applied
+ // Set SSE-S3 metadata
if sseS3Key != nil && len(sseS3Metadata) > 0 {
- // Store serialized S3 metadata in a custom header that the filer can use
- proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
- glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
- }
- // Set TTL-based S3 expiry (modification time)
- proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true")
- // ensure that the Authorization header is overriding any previous
- // Authorization header which might be already present in proxyReq
- s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
- resp, postErr := s3a.client.Do(proxyReq)
-
- if postErr != nil {
- glog.Errorf("post to filer: %v", postErr)
- if strings.Contains(postErr.Error(), s3err.ErrMsgPayloadChecksumMismatch) {
- return "", s3err.ErrInvalidDigest, ""
+ // Store metadata as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
+ entry.Extended[s3_constants.SeaweedFSSSES3Key] = sseS3Metadata
+ // Set standard SSE header for detection
+ entry.Extended[s3_constants.AmzServerSideEncryption] = []byte("AES256")
+ glog.V(3).Infof("putToFiler: storing SSE-S3 metadata - keyID=%s, raw len=%d", sseS3Key.KeyID, len(sseS3Metadata))
+ }
+
+ // Step 4: Save metadata to filer via gRPC
+ // Use context.Background() to ensure metadata save completes even if HTTP request is cancelled
+ // This matches the chunk upload behavior and prevents orphaned chunks
+ glog.V(3).Infof("putToFiler: About to create entry - dir=%s, name=%s, chunks=%d, extended keys=%d",
+ filepath.Dir(filePath), filepath.Base(filePath), len(entry.Chunks), len(entry.Extended))
+ createErr := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ req := &filer_pb.CreateEntryRequest{
+ Directory: filepath.Dir(filePath),
+ Entry: entry,
+ }
+ glog.V(3).Infof("putToFiler: Calling CreateEntry for %s", filePath)
+ _, err := client.CreateEntry(context.Background(), req)
+ if err != nil {
+ glog.Errorf("putToFiler: CreateEntry returned error: %v", err)
}
- return "", s3err.ErrInternalError, ""
+ return err
+ })
+ if createErr != nil {
+ glog.Errorf("putToFiler: failed to create entry for %s: %v", filePath, createErr)
+
+ // CRITICAL: Cleanup orphaned chunks before returning error
+ // If CreateEntry fails, the uploaded chunks are orphaned and must be deleted
+ // to prevent resource leaks and wasted storage
+ if len(chunkResult.FileChunks) > 0 {
+ glog.Warningf("putToFiler: CreateEntry failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks))
+ s3a.deleteOrphanedChunks(chunkResult.FileChunks)
+ }
+
+ return "", filerErrorToS3Error(createErr.Error()), SSEResponseMetadata{}
}
- defer resp.Body.Close()
+ glog.V(3).Infof("putToFiler: CreateEntry SUCCESS for %s", filePath)
- etag = fmt.Sprintf("%x", hash.Sum(nil))
+ glog.V(2).Infof("putToFiler: Metadata saved SUCCESS - path=%s, etag(hex)=%s, size=%d, partNumber=%d",
+ filePath, etag, entry.Attributes.FileSize, partNumber)
- resp_body, ra_err := io.ReadAll(resp.Body)
- if ra_err != nil {
- glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
- return etag, s3err.ErrInternalError, ""
- }
- var ret weed_server.FilerPostResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
- return "", s3err.ErrInternalError, ""
- }
- if ret.Error != "" {
- glog.Errorf("upload to filer error: %v", ret.Error)
- return "", filerErrorToS3Error(ret.Error), ""
+ BucketTrafficReceived(chunkResult.TotalSize, r)
+
+ // Build SSE response metadata with encryption details
+ responseMetadata := SSEResponseMetadata{
+ SSEType: sseType,
}
- BucketTrafficReceived(ret.Size, r)
+ // For SSE-KMS, include key ID and bucket-key-enabled flag from stored metadata
+ if sseKMSKey != nil {
+ responseMetadata.KMSKeyID = sseKMSKey.KeyID
+ responseMetadata.BucketKeyEnabled = sseKMSKey.BucketKeyEnabled
+ glog.V(4).Infof("putToFiler: returning SSE-KMS metadata - keyID=%s, bucketKeyEnabled=%v",
+ sseKMSKey.KeyID, sseKMSKey.BucketKeyEnabled)
+ }
- // Return the SSE type determined by the unified handler
- return etag, s3err.ErrNone, sseResult.SSEType
+ return etag, s3err.ErrNone, responseMetadata
}
func setEtag(w http.ResponseWriter, etag string) {
@@ -384,6 +640,43 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
+// setSSEResponseHeaders sets appropriate SSE response headers based on encryption type
+func (s3a *S3ApiServer) setSSEResponseHeaders(w http.ResponseWriter, r *http.Request, sseMetadata SSEResponseMetadata) {
+ switch sseMetadata.SSEType {
+ case s3_constants.SSETypeS3:
+ // SSE-S3: Return the encryption algorithm
+ w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
+
+ case s3_constants.SSETypeC:
+ // SSE-C: Echo back the customer-provided algorithm and key MD5
+ if algo := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm); algo != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, algo)
+ }
+ if keyMD5 := r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5); keyMD5 != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, keyMD5)
+ }
+
+ case s3_constants.SSETypeKMS:
+ // SSE-KMS: Return the KMS key ID and algorithm
+ w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
+
+ // Use metadata from stored encryption config (for bucket-default encryption)
+ // or fall back to request headers (for explicit encryption)
+ if sseMetadata.KMSKeyID != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseMetadata.KMSKeyID)
+ } else if keyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId); keyID != "" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, keyID)
+ }
+
+ // Set bucket-key-enabled header if it was enabled
+ if sseMetadata.BucketKeyEnabled {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
+ } else if bucketKeyEnabled := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyEnabled == "true" {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
+ }
+ }
+}
+
func filerErrorToS3Error(errString string) s3err.ErrorCode {
switch {
case errString == constants.ErrMsgBadDigest:
@@ -400,26 +693,6 @@ func filerErrorToS3Error(errString string) s3err.ErrorCode {
}
}
-func (s3a *S3ApiServer) maybeAddFilerJwtAuthorization(r *http.Request, isWrite bool) {
- encodedJwt := s3a.maybeGetFilerJwtAuthorizationToken(isWrite)
-
- if encodedJwt == "" {
- return
- }
-
- r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
-}
-
-func (s3a *S3ApiServer) maybeGetFilerJwtAuthorizationToken(isWrite bool) string {
- var encodedJwt security.EncodedJwt
- if isWrite {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.SigningKey, s3a.filerGuard.ExpiresAfterSec)
- } else {
- encodedJwt = security.GenJwtForFilerServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec)
- }
- return string(encodedJwt)
-}
-
// setObjectOwnerFromRequest sets the object owner metadata based on the authenticated user
func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_pb.Entry) {
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@@ -446,19 +719,12 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
//
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
-func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode) {
+func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object)
- // Enable detailed logging for testobjbar
- isTestObj := (normalizedObject == "testobjbar")
-
- glog.V(0).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
- bucket, object, normalizedObject, isTestObj)
-
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
- }
+ glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s",
+ bucket, object, normalizedObject)
bucketDir := s3a.option.BucketsPath + "/" + bucket
@@ -470,20 +736,20 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
entries, _, err := s3a.list(versionsDir, "", "", false, 1000)
if err == nil {
// .versions directory exists
- glog.V(0).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
+ glog.V(3).Infof("putSuspendedVersioningObject: found %d entries in .versions for %s/%s", len(entries), bucket, object)
for _, entry := range entries {
if entry.Extended != nil {
if versionIdBytes, ok := entry.Extended[s3_constants.ExtVersionIdKey]; ok {
versionId := string(versionIdBytes)
- glog.V(0).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
+ glog.V(3).Infof("putSuspendedVersioningObject: found version '%s' in .versions", versionId)
if versionId == "null" {
// Only delete null version - preserve real versioned entries
- glog.V(0).Infof("putSuspendedVersioningObject: deleting null version from .versions")
+ glog.V(3).Infof("putSuspendedVersioningObject: deleting null version from .versions")
err := s3a.rm(versionsDir, entry.Name, true, false)
if err != nil {
glog.Warningf("putSuspendedVersioningObject: failed to delete null version: %v", err)
} else {
- glog.V(0).Infof("putSuspendedVersioningObject: successfully deleted null version")
+ glog.V(3).Infof("putSuspendedVersioningObject: successfully deleted null version")
}
break
}
@@ -491,13 +757,12 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
}
} else {
- glog.V(0).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
+ glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
}
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ body := dataReader
if objectContentType == "" {
body = mimeDetect(r, body)
}
@@ -508,10 +773,6 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Set version ID to "null" for suspended versioning
r.Header.Set(s3_constants.ExtVersionIdKey, "null")
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
- s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey))
- }
// Extract and set object lock metadata as headers
// This handles retention mode, retention date, and legal hold
@@ -528,7 +789,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
parsedTime, err := time.Parse(time.RFC3339, explicitRetainUntilDate)
if err != nil {
glog.Errorf("putSuspendedVersioningObject: failed to parse retention until date: %v", err)
- return "", s3err.ErrInvalidRequest
+ return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
r.Header.Set(s3_constants.ExtRetentionUntilDateKey, strconv.FormatInt(parsedTime.Unix(), 10))
glog.V(2).Infof("putSuspendedVersioningObject: setting retention until date header (timestamp: %d)", parsedTime.Unix())
@@ -540,7 +801,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(2).Infof("putSuspendedVersioningObject: setting legal hold header: %s", legalHold)
} else {
glog.Errorf("putSuspendedVersioningObject: invalid legal hold value: %s", legalHold)
- return "", s3err.ErrInvalidRequest
+ return "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
}
@@ -562,43 +823,10 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
// Upload the file using putToFiler - this will create the file with version metadata
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: calling putToFiler ===")
- }
- etag, errCode, _ = s3a.putToFiler(r, uploadUrl, body, "", bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
- return "", errCode
- }
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
- }
-
- // Verify the metadata was set correctly during file creation
- if isTestObj {
- // Read back the entry to verify
- maxRetries := 3
- for attempt := 1; attempt <= maxRetries; attempt++ {
- verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject)
- if verifyErr == nil {
- glog.V(0).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
- if verifyEntry.Extended != nil {
- if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok {
- glog.V(0).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
- }
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
- }
- break
- } else {
- glog.V(0).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
- }
- if attempt < maxRetries {
- time.Sleep(time.Millisecond * 10)
- }
- }
+ return "", errCode, SSEResponseMetadata{}
}
// Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
@@ -609,10 +837,8 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
- if isTestObj {
- glog.V(0).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
- }
- return etag, s3err.ErrNone
+
+ return etag, s3err.ErrNone, sseMetadata
}
// updateIsLatestFlagsForSuspendedVersioning sets IsLatest=false on all existing versions/delete markers
@@ -684,7 +910,7 @@ func (s3a *S3ApiServer) updateIsLatestFlagsForSuspendedVersioning(bucket, object
return nil
}
-func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode) {
+func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (versionId string, etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// Generate version ID
versionId = generateVersionId()
@@ -709,21 +935,20 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to create .versions directory: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
- hash := md5.New()
- var body = io.TeeReader(dataReader, hash)
+ body := dataReader
if objectContentType == "" {
body = mimeDetect(r, body)
}
glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
- etag, errCode, _ = s3a.putToFiler(r, versionUploadUrl, body, "", bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
- return "", "", errCode
+ return "", "", errCode, SSEResponseMetadata{}
}
// Get the uploaded entry to add versioning metadata
@@ -745,7 +970,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
if err != nil {
glog.Errorf("putVersionedObject: failed to get version entry after %d attempts: %v", maxRetries, err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Add versioning metadata to this version
@@ -766,7 +991,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Extract and store object lock metadata from request headers
if err := s3a.extractObjectLockMetadataFromRequest(r, versionEntry); err != nil {
glog.Errorf("putVersionedObject: failed to extract object lock metadata: %v", err)
- return "", "", s3err.ErrInvalidRequest
+ return "", "", s3err.ErrInvalidRequest, SSEResponseMetadata{}
}
// Update the version entry with metadata
@@ -777,17 +1002,17 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
})
if err != nil {
glog.Errorf("putVersionedObject: failed to update version metadata: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
// Update the .versions directory metadata to indicate this is the latest version
err = s3a.updateLatestVersionInDirectory(bucket, normalizedObject, versionId, versionFileName)
if err != nil {
glog.Errorf("putVersionedObject: failed to update latest version in directory: %v", err)
- return "", "", s3err.ErrInternalError
+ return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
}
glog.V(2).Infof("putVersionedObject: successfully created version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
- return versionId, etag, s3err.ErrNone
+ return versionId, etag, s3err.ErrNone, sseMetadata
}
// updateLatestVersionInDirectory updates the .versions directory metadata to indicate the latest version
@@ -897,7 +1122,16 @@ func (s3a *S3ApiServer) extractObjectLockMetadataFromRequest(r *http.Request, en
func (s3a *S3ApiServer) applyBucketDefaultEncryption(bucket string, r *http.Request, dataReader io.Reader) (*BucketDefaultEncryptionResult, error) {
// Check if bucket has default encryption configured
encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
- if err != nil || encryptionConfig == nil {
+ if err != nil {
+ // Check if this is just "no encryption configured" vs a real error
+ if errors.Is(err, ErrNoEncryptionConfig) {
+ // No default encryption configured, return original reader
+ return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
+ }
+ // Real error - propagate to prevent silent encryption bypass
+ return nil, fmt.Errorf("failed to read bucket encryption config: %v", err)
+ }
+ if encryptionConfig == nil {
// No default encryption configured, return original reader
return &BucketDefaultEncryptionResult{DataReader: dataReader}, nil
}
@@ -963,7 +1197,8 @@ func (s3a *S3ApiServer) applySSEKMSDefaultEncryption(bucket string, r *http.Requ
bucketKeyEnabled := encryptionConfig.BucketKeyEnabled
// Build encryption context for KMS
- bucket, object := s3_constants.GetBucketAndObject(r)
+ // Use bucket parameter passed to function (not from request parsing)
+ _, object := s3_constants.GetBucketAndObject(r)
encryptionContext := BuildEncryptionContext(bucket, object, bucketKeyEnabled)
// Create SSE-KMS encrypted reader
@@ -1474,3 +1709,88 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe
func (s3a *S3ApiServer) checkConditionalHeadersForReads(r *http.Request, bucket, object string) ConditionalHeaderResult {
return s3a.checkConditionalHeadersForReadsWithGetter(s3a, r, bucket, object)
}
+
+// deleteOrphanedChunks attempts to delete chunks that were uploaded but whose entry creation failed
+// This prevents resource leaks and wasted storage. Errors are logged but don't prevent cleanup attempts.
+func (s3a *S3ApiServer) deleteOrphanedChunks(chunks []*filer_pb.FileChunk) {
+ if len(chunks) == 0 {
+ return
+ }
+
+ // Extract file IDs from chunks
+ var fileIds []string
+ for _, chunk := range chunks {
+ if chunk.GetFileIdString() != "" {
+ fileIds = append(fileIds, chunk.GetFileIdString())
+ }
+ }
+
+ if len(fileIds) == 0 {
+ glog.Warningf("deleteOrphanedChunks: no valid file IDs found in %d chunks", len(chunks))
+ return
+ }
+
+ glog.V(3).Infof("deleteOrphanedChunks: attempting to delete %d file IDs: %v", len(fileIds), fileIds)
+
+ // Create a lookup function that queries the filer for volume locations
+ // This is similar to createLookupFileIdFunction but returns the format needed by DeleteFileIdsWithLookupVolumeId
+ lookupFunc := func(vids []string) (map[string]*operation.LookupResult, error) {
+ results := make(map[string]*operation.LookupResult)
+
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Query filer for all volume IDs at once
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return err
+ }
+
+ // Convert filer response to operation.LookupResult format
+ for vid, locs := range resp.LocationsMap {
+ result := &operation.LookupResult{
+ VolumeOrFileId: vid,
+ }
+
+ for _, loc := range locs.Locations {
+ result.Locations = append(result.Locations, operation.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: int(loc.GrpcPort),
+ })
+ }
+
+ results[vid] = result
+ }
+ return nil
+ })
+
+ return results, err
+ }
+
+ // Attempt deletion using the operation package's batch delete with custom lookup
+ deleteResults := operation.DeleteFileIdsWithLookupVolumeId(s3a.option.GrpcDialOption, fileIds, lookupFunc)
+
+ // Log results - track successes and failures
+ successCount := 0
+ failureCount := 0
+ for _, result := range deleteResults {
+ if result.Error != "" {
+ glog.Warningf("deleteOrphanedChunks: failed to delete chunk %s: %s (status: %d)",
+ result.FileId, result.Error, result.Status)
+ failureCount++
+ } else {
+ glog.V(4).Infof("deleteOrphanedChunks: successfully deleted chunk %s (size: %d bytes)",
+ result.FileId, result.Size)
+ successCount++
+ }
+ }
+
+ if failureCount > 0 {
+ glog.Warningf("deleteOrphanedChunks: cleanup completed with %d successes and %d failures out of %d chunks",
+ successCount, failureCount, len(fileIds))
+ } else {
+ glog.V(3).Infof("deleteOrphanedChunks: successfully deleted all %d orphaned chunks", successCount)
+ }
+}