diff options
Diffstat (limited to 'weed/s3api/s3api_object_retention.go')
| -rw-r--r-- | weed/s3api/s3api_object_retention.go | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_retention.go b/weed/s3api/s3api_object_retention.go new file mode 100644 index 000000000..bedf693ef --- /dev/null +++ b/weed/s3api/s3api_object_retention.go @@ -0,0 +1,598 @@ +package s3api + +import ( + "encoding/xml" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// Sentinel errors for proper error handling instead of string matching +var ( + ErrNoRetentionConfiguration = errors.New("no retention configuration found") + ErrNoLegalHoldConfiguration = errors.New("no legal hold configuration found") + ErrBucketNotFound = errors.New("bucket not found") + ErrObjectNotFound = errors.New("object not found") + ErrVersionNotFound = errors.New("version not found") + ErrLatestVersionNotFound = errors.New("latest version not found") + ErrComplianceModeActive = errors.New("object is under COMPLIANCE mode retention and cannot be deleted or modified") + ErrGovernanceModeActive = errors.New("object is under GOVERNANCE mode retention and cannot be deleted or modified without bypass") +) + +const ( + // Maximum retention period limits according to AWS S3 specifications + MaxRetentionDays = 36500 // Maximum number of days for object retention (100 years) + MaxRetentionYears = 100 // Maximum number of years for object retention +) + +// ObjectRetention represents S3 Object Retention configuration +type ObjectRetention struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Retention"` + Mode string `xml:"Mode,omitempty"` + RetainUntilDate *time.Time `xml:"RetainUntilDate,omitempty"` +} + +// ObjectLegalHold represents S3 Object Legal Hold configuration +type ObjectLegalHold struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LegalHold"` + Status string `xml:"Status,omitempty"` +} + +// ObjectLockConfiguration represents S3 Object Lock Configuration +type ObjectLockConfiguration struct { + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockConfiguration"` + ObjectLockEnabled string `xml:"ObjectLockEnabled,omitempty"` + Rule *ObjectLockRule `xml:"Rule,omitempty"` +} + +// ObjectLockRule represents an Object Lock Rule +type ObjectLockRule struct { + XMLName xml.Name `xml:"Rule"` + DefaultRetention *DefaultRetention `xml:"DefaultRetention,omitempty"` +} + +// DefaultRetention represents default retention settings +type DefaultRetention struct { + XMLName xml.Name `xml:"DefaultRetention"` + Mode string `xml:"Mode,omitempty"` + Days int `xml:"Days,omitempty"` + Years int `xml:"Years,omitempty"` +} + +// Custom time unmarshalling for AWS S3 ISO8601 format +func (or *ObjectRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error { + type Alias ObjectRetention + aux := &struct { + *Alias + RetainUntilDate *string `xml:"RetainUntilDate,omitempty"` + }{ + Alias: (*Alias)(or), + } + + if err := d.DecodeElement(aux, &start); err != nil { + return err + } + + if aux.RetainUntilDate != nil { + t, err := time.Parse(time.RFC3339, *aux.RetainUntilDate) + if err != nil { + return err + } + or.RetainUntilDate = &t + } + + return nil +} + +// parseXML is a generic helper function to parse XML from an HTTP request body. +// It uses xml.Decoder for streaming XML parsing, which is more memory-efficient +// and avoids loading the entire request body into memory. +// +// The function assumes: +// - The request body is not nil (returns error if it is) +// - The request body will be closed after parsing (deferred close) +// - The XML content matches the structure of the provided result type T +// +// This approach is optimized for small XML payloads typical in S3 API requests +// (retention configurations, legal hold settings, etc.) where the overhead of +// streaming parsing is acceptable for the memory efficiency benefits. +func parseXML[T any](r *http.Request, result *T) error { + if r.Body == nil { + return fmt.Errorf("error parsing XML: empty request body") + } + defer r.Body.Close() + + decoder := xml.NewDecoder(r.Body) + if err := decoder.Decode(result); err != nil { + return fmt.Errorf("error parsing XML: %v", err) + } + + return nil +} + +// parseObjectRetention parses XML retention configuration from request body +func parseObjectRetention(r *http.Request) (*ObjectRetention, error) { + var retention ObjectRetention + if err := parseXML(r, &retention); err != nil { + return nil, err + } + return &retention, nil +} + +// parseObjectLegalHold parses XML legal hold configuration from request body +func parseObjectLegalHold(r *http.Request) (*ObjectLegalHold, error) { + var legalHold ObjectLegalHold + if err := parseXML(r, &legalHold); err != nil { + return nil, err + } + return &legalHold, nil +} + +// parseObjectLockConfiguration parses XML object lock configuration from request body +func parseObjectLockConfiguration(r *http.Request) (*ObjectLockConfiguration, error) { + var config ObjectLockConfiguration + if err := parseXML(r, &config); err != nil { + return nil, err + } + return &config, nil +} + +// validateRetention validates retention configuration +func validateRetention(retention *ObjectRetention) error { + // AWS requires both Mode and RetainUntilDate for PutObjectRetention + if retention.Mode == "" { + return fmt.Errorf("retention configuration must specify Mode") + } + + if retention.RetainUntilDate == nil { + return fmt.Errorf("retention configuration must specify RetainUntilDate") + } + + if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance { + return fmt.Errorf("invalid retention mode: %s", retention.Mode) + } + + if retention.RetainUntilDate.Before(time.Now()) { + return fmt.Errorf("retain until date must be in the future") + } + + return nil +} + +// validateLegalHold validates legal hold configuration +func validateLegalHold(legalHold *ObjectLegalHold) error { + if legalHold.Status != s3_constants.LegalHoldOn && legalHold.Status != s3_constants.LegalHoldOff { + return fmt.Errorf("invalid legal hold status: %s", legalHold.Status) + } + + return nil +} + +// validateObjectLockConfiguration validates object lock configuration +func validateObjectLockConfiguration(config *ObjectLockConfiguration) error { + // ObjectLockEnabled is required for bucket-level configuration + if config.ObjectLockEnabled == "" { + return fmt.Errorf("object lock configuration must specify ObjectLockEnabled") + } + + // Validate ObjectLockEnabled value + if config.ObjectLockEnabled != s3_constants.ObjectLockEnabled { + return fmt.Errorf("invalid object lock enabled value: %s", config.ObjectLockEnabled) + } + + // Validate Rule if present + if config.Rule != nil { + if config.Rule.DefaultRetention == nil { + return fmt.Errorf("rule configuration must specify DefaultRetention") + } + return validateDefaultRetention(config.Rule.DefaultRetention) + } + + return nil +} + +// validateDefaultRetention validates default retention configuration +func validateDefaultRetention(retention *DefaultRetention) error { + // Mode is required + if retention.Mode == "" { + return fmt.Errorf("default retention must specify Mode") + } + + // Mode must be valid + if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance { + return fmt.Errorf("invalid default retention mode: %s", retention.Mode) + } + + // Exactly one of Days or Years must be specified + if retention.Days == 0 && retention.Years == 0 { + return fmt.Errorf("default retention must specify either Days or Years") + } + + if retention.Days > 0 && retention.Years > 0 { + return fmt.Errorf("default retention cannot specify both Days and Years") + } + + // Validate ranges + if retention.Days < 0 || retention.Days > MaxRetentionDays { + return fmt.Errorf("default retention days must be between 0 and %d", MaxRetentionDays) + } + + if retention.Years < 0 || retention.Years > MaxRetentionYears { + return fmt.Errorf("default retention years must be between 0 and %d", MaxRetentionYears) + } + + return nil +} + +// getObjectEntry retrieves the appropriate object entry based on versioning and versionId +func (s3a *S3ApiServer) getObjectEntry(bucket, object, versionId string) (*filer_pb.Entry, error) { + var entry *filer_pb.Entry + var err error + + if versionId != "" { + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + } else { + // Check if versioning is enabled + versioningEnabled, vErr := s3a.isVersioningEnabled(bucket) + if vErr != nil { + return nil, fmt.Errorf("error checking versioning: %v", vErr) + } + + if versioningEnabled { + entry, err = s3a.getLatestObjectVersion(bucket, object) + } else { + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err = s3a.getEntry(bucketDir, object) + } + } + + if err != nil { + return nil, fmt.Errorf("failed to retrieve object %s/%s: %w", bucket, object, ErrObjectNotFound) + } + + return entry, nil +} + +// getObjectRetention retrieves retention configuration from object metadata +func (s3a *S3ApiServer) getObjectRetention(bucket, object, versionId string) (*ObjectRetention, error) { + entry, err := s3a.getObjectEntry(bucket, object, versionId) + if err != nil { + return nil, err + } + + if entry.Extended == nil { + return nil, ErrNoRetentionConfiguration + } + + retention := &ObjectRetention{} + + if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { + retention.Mode = string(modeBytes) + } + + if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists { + if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil { + t := time.Unix(timestamp, 0) + retention.RetainUntilDate = &t + } else { + return nil, fmt.Errorf("failed to parse retention timestamp for %s/%s: corrupted timestamp data", bucket, object) + } + } + + if retention.Mode == "" || retention.RetainUntilDate == nil { + return nil, ErrNoRetentionConfiguration + } + + return retention, nil +} + +// setObjectRetention sets retention configuration on object metadata +func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, retention *ObjectRetention, bypassGovernance bool) error { + var entry *filer_pb.Entry + var err error + var entryPath string + + if versionId != "" { + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound) + } + entryPath = object + ".versions/" + s3a.getVersionFileName(versionId) + } else { + // Check if versioning is enabled + versioningEnabled, vErr := s3a.isVersioningEnabled(bucket) + if vErr != nil { + return fmt.Errorf("error checking versioning: %v", vErr) + } + + if versioningEnabled { + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound) + } + // Extract version ID from entry metadata + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + versionId = string(versionIdBytes) + entryPath = object + ".versions/" + s3a.getVersionFileName(versionId) + } + } + } else { + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err = s3a.getEntry(bucketDir, object) + if err != nil { + return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound) + } + entryPath = object + } + } + + // Check if object is already under retention + if entry.Extended != nil { + if existingMode, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { + if string(existingMode) == s3_constants.RetentionModeCompliance && !bypassGovernance { + return fmt.Errorf("cannot modify retention on object under COMPLIANCE mode") + } + + if existingDateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { + if timestamp, err := strconv.ParseInt(string(existingDateBytes), 10, 64); err == nil { + existingDate := time.Unix(timestamp, 0) + if existingDate.After(time.Now()) && string(existingMode) == s3_constants.RetentionModeGovernance && !bypassGovernance { + return fmt.Errorf("cannot modify retention on object under GOVERNANCE mode without bypass") + } + } + } + } + } + + // Update retention metadata + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + if retention.Mode != "" { + entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(retention.Mode) + } + + if retention.RetainUntilDate != nil { + entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retention.RetainUntilDate.Unix(), 10)) + + // Also update the existing WORM fields for compatibility + entry.WormEnforcedAtTsNs = time.Now().UnixNano() + } + + // Update the entry + // NOTE: Potential race condition exists if concurrent calls to PutObjectRetention + // and PutObjectLegalHold update the same object simultaneously, as they might + // overwrite each other's Extended map changes. This is mitigated by the fact + // that mkFile operations are typically serialized at the filer level, but + // future implementations might consider using atomic update operations or + // entry-level locking for complete safety. + bucketDir := s3a.option.BucketsPath + "/" + bucket + return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = entry.Extended + updatedEntry.WormEnforcedAtTsNs = entry.WormEnforcedAtTsNs + }) +} + +// getObjectLegalHold retrieves legal hold configuration from object metadata +func (s3a *S3ApiServer) getObjectLegalHold(bucket, object, versionId string) (*ObjectLegalHold, error) { + entry, err := s3a.getObjectEntry(bucket, object, versionId) + if err != nil { + return nil, err + } + + if entry.Extended == nil { + return nil, ErrNoLegalHoldConfiguration + } + + legalHold := &ObjectLegalHold{} + + if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { + legalHold.Status = string(statusBytes) + } else { + return nil, ErrNoLegalHoldConfiguration + } + + return legalHold, nil +} + +// setObjectLegalHold sets legal hold configuration on object metadata +func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, legalHold *ObjectLegalHold) error { + var entry *filer_pb.Entry + var err error + var entryPath string + + if versionId != "" { + entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) + if err != nil { + return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound) + } + entryPath = object + ".versions/" + s3a.getVersionFileName(versionId) + } else { + // Check if versioning is enabled + versioningEnabled, vErr := s3a.isVersioningEnabled(bucket) + if vErr != nil { + return fmt.Errorf("error checking versioning: %v", vErr) + } + + if versioningEnabled { + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound) + } + // Extract version ID from entry metadata + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + versionId = string(versionIdBytes) + entryPath = object + ".versions/" + s3a.getVersionFileName(versionId) + } + } + } else { + bucketDir := s3a.option.BucketsPath + "/" + bucket + entry, err = s3a.getEntry(bucketDir, object) + if err != nil { + return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound) + } + entryPath = object + } + } + + // Update legal hold metadata + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold.Status) + + // Update the entry + // NOTE: Potential race condition exists if concurrent calls to PutObjectRetention + // and PutObjectLegalHold update the same object simultaneously, as they might + // overwrite each other's Extended map changes. This is mitigated by the fact + // that mkFile operations are typically serialized at the filer level, but + // future implementations might consider using atomic update operations or + // entry-level locking for complete safety. + bucketDir := s3a.option.BucketsPath + "/" + bucket + return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) { + updatedEntry.Extended = entry.Extended + }) +} + +// isObjectRetentionActive checks if an object is currently under retention +func (s3a *S3ApiServer) isObjectRetentionActive(bucket, object, versionId string) (bool, error) { + retention, err := s3a.getObjectRetention(bucket, object, versionId) + if err != nil { + // If no retention found, object is not under retention + if errors.Is(err, ErrNoRetentionConfiguration) { + return false, nil + } + return false, err + } + + if retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now()) { + return true, nil + } + + return false, nil +} + +// getObjectRetentionWithStatus retrieves retention configuration and returns both the data and active status +// This is an optimization to avoid duplicate fetches when both retention data and status are needed +func (s3a *S3ApiServer) getObjectRetentionWithStatus(bucket, object, versionId string) (*ObjectRetention, bool, error) { + retention, err := s3a.getObjectRetention(bucket, object, versionId) + if err != nil { + // If no retention found, object is not under retention + if errors.Is(err, ErrNoRetentionConfiguration) { + return nil, false, nil + } + return nil, false, err + } + + // Check if retention is currently active + isActive := retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now()) + return retention, isActive, nil +} + +// isObjectLegalHoldActive checks if an object is currently under legal hold +func (s3a *S3ApiServer) isObjectLegalHoldActive(bucket, object, versionId string) (bool, error) { + legalHold, err := s3a.getObjectLegalHold(bucket, object, versionId) + if err != nil { + // If no legal hold found, object is not under legal hold + if errors.Is(err, ErrNoLegalHoldConfiguration) { + return false, nil + } + return false, err + } + + return legalHold.Status == s3_constants.LegalHoldOn, nil +} + +// checkObjectLockPermissions checks if an object can be deleted or modified +func (s3a *S3ApiServer) checkObjectLockPermissions(bucket, object, versionId string, bypassGovernance bool) error { + // Get retention configuration and status in a single call to avoid duplicate fetches + retention, retentionActive, err := s3a.getObjectRetentionWithStatus(bucket, object, versionId) + if err != nil { + glog.Warningf("Error checking retention for %s/%s: %v", bucket, object, err) + } + + // Check if object is under legal hold + legalHoldActive, err := s3a.isObjectLegalHoldActive(bucket, object, versionId) + if err != nil { + glog.Warningf("Error checking legal hold for %s/%s: %v", bucket, object, err) + } + + // If object is under legal hold, it cannot be deleted or modified + if legalHoldActive { + return fmt.Errorf("object is under legal hold and cannot be deleted or modified") + } + + // If object is under retention, check the mode + if retentionActive && retention != nil { + if retention.Mode == s3_constants.RetentionModeCompliance { + return ErrComplianceModeActive + } + + if retention.Mode == s3_constants.RetentionModeGovernance && !bypassGovernance { + return ErrGovernanceModeActive + } + } + + return nil +} + +// isObjectLockAvailable checks if Object Lock features are available for the bucket +// Object Lock requires versioning to be enabled (AWS S3 requirement) +func (s3a *S3ApiServer) isObjectLockAvailable(bucket string) error { + versioningEnabled, err := s3a.isVersioningEnabled(bucket) + if err != nil { + if errors.Is(err, filer_pb.ErrNotFound) { + return ErrBucketNotFound + } + return fmt.Errorf("error checking versioning status: %v", err) + } + + if !versioningEnabled { + return fmt.Errorf("object lock requires versioning to be enabled") + } + + return nil +} + +// checkObjectLockPermissionsForPut checks object lock permissions for PUT operations +// This is a shared helper to avoid code duplication in PUT handlers +func (s3a *S3ApiServer) checkObjectLockPermissionsForPut(bucket, object string, bypassGovernance bool, versioningEnabled bool) error { + // Object Lock only applies to versioned buckets (AWS S3 requirement) + if !versioningEnabled { + return nil + } + + // For PUT operations, we check permissions on the current object (empty versionId) + if err := s3a.checkObjectLockPermissions(bucket, object, "", bypassGovernance); err != nil { + glog.V(2).Infof("checkObjectLockPermissionsForPut: object lock check failed for %s/%s: %v", bucket, object, err) + return err + } + return nil +} + +// handleObjectLockAvailabilityCheck is a helper function to check object lock availability +// and write the appropriate error response if not available. This reduces code duplication +// across all retention handlers. +func (s3a *S3ApiServer) handleObjectLockAvailabilityCheck(w http.ResponseWriter, r *http.Request, bucket, handlerName string) bool { + if err := s3a.isObjectLockAvailable(bucket); err != nil { + glog.Errorf("%s: object lock not available for bucket %s: %v", handlerName, bucket, err) + if errors.Is(err, ErrBucketNotFound) { + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket) + } else { + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) + } + return false + } + return true +} |
