aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_retention.go
blob: 14fc0d28387aa6826b91f3b9a6c2a6e9c8a0e548 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
package s3api

import (
	"encoding/xml"
	"errors"
	"fmt"
	"net/http"
	"strconv"
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
	"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
)

// Sentinel errors for proper error handling instead of string matching
var (
	ErrNoRetentionConfiguration = errors.New("no retention configuration found")
	ErrNoLegalHoldConfiguration = errors.New("no legal hold configuration found")
	ErrBucketNotFound           = errors.New("bucket not found")
	ErrObjectNotFound           = errors.New("object not found")
	ErrVersionNotFound          = errors.New("version not found")
	ErrLatestVersionNotFound    = errors.New("latest version not found")
	ErrComplianceModeActive     = errors.New("object is under COMPLIANCE mode retention and cannot be deleted or modified")
	ErrGovernanceModeActive     = errors.New("object is under GOVERNANCE mode retention and cannot be deleted or modified without bypass")
)

// Error definitions for Object Lock
var (
	ErrObjectUnderLegalHold         = errors.New("object is under legal hold and cannot be deleted or modified")
	ErrGovernanceBypassNotPermitted = errors.New("user does not have permission to bypass governance retention")
)

const (
	// Maximum retention period limits according to AWS S3 specifications
	MaxRetentionDays  = 36500 // Maximum number of days for object retention (100 years)
	MaxRetentionYears = 100   // Maximum number of years for object retention
)

// ObjectRetention represents S3 Object Retention configuration
type ObjectRetention struct {
	XMLName         xml.Name   `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Retention"`
	Mode            string     `xml:"Mode,omitempty"`
	RetainUntilDate *time.Time `xml:"RetainUntilDate,omitempty"`
}

// ObjectLegalHold represents S3 Object Legal Hold configuration
type ObjectLegalHold struct {
	XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LegalHold"`
	Status  string   `xml:"Status,omitempty"`
}

// ObjectLockConfiguration represents S3 Object Lock Configuration
type ObjectLockConfiguration struct {
	XMLName           xml.Name        `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockConfiguration"`
	ObjectLockEnabled string          `xml:"ObjectLockEnabled,omitempty"`
	Rule              *ObjectLockRule `xml:"Rule,omitempty"`
}

// ObjectLockRule represents an Object Lock Rule
type ObjectLockRule struct {
	XMLName          xml.Name          `xml:"Rule"`
	DefaultRetention *DefaultRetention `xml:"DefaultRetention,omitempty"`
}

// DefaultRetention represents default retention settings
type DefaultRetention struct {
	XMLName xml.Name `xml:"DefaultRetention"`
	Mode    string   `xml:"Mode,omitempty"`
	Days    int      `xml:"Days,omitempty"`
	Years   int      `xml:"Years,omitempty"`
}

// Custom time unmarshalling for AWS S3 ISO8601 format
func (or *ObjectRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error {
	type Alias ObjectRetention
	aux := &struct {
		*Alias
		RetainUntilDate *string `xml:"RetainUntilDate,omitempty"`
	}{
		Alias: (*Alias)(or),
	}

	if err := d.DecodeElement(aux, &start); err != nil {
		return err
	}

	if aux.RetainUntilDate != nil {
		t, err := time.Parse(time.RFC3339, *aux.RetainUntilDate)
		if err != nil {
			return err
		}
		or.RetainUntilDate = &t
	}

	return nil
}

// parseXML is a generic helper function to parse XML from an HTTP request body.
// It uses xml.Decoder for streaming XML parsing, which is more memory-efficient
// and avoids loading the entire request body into memory.
//
// The function assumes:
// - The request body is not nil (returns error if it is)
// - The request body will be closed after parsing (deferred close)
// - The XML content matches the structure of the provided result type T
//
// This approach is optimized for small XML payloads typical in S3 API requests
// (retention configurations, legal hold settings, etc.) where the overhead of
// streaming parsing is acceptable for the memory efficiency benefits.
func parseXML[T any](request *http.Request, result *T) error {
	if request.Body == nil {
		return fmt.Errorf("error parsing XML: empty request body")
	}
	defer request.Body.Close()

	decoder := xml.NewDecoder(request.Body)
	if err := decoder.Decode(result); err != nil {
		return fmt.Errorf("error parsing XML: %w", err)
	}

	return nil
}

// parseObjectRetention parses XML retention configuration from request body
func parseObjectRetention(request *http.Request) (*ObjectRetention, error) {
	var retention ObjectRetention
	if err := parseXML(request, &retention); err != nil {
		return nil, err
	}
	return &retention, nil
}

// parseObjectLegalHold parses XML legal hold configuration from request body
func parseObjectLegalHold(request *http.Request) (*ObjectLegalHold, error) {
	var legalHold ObjectLegalHold
	if err := parseXML(request, &legalHold); err != nil {
		return nil, err
	}
	return &legalHold, nil
}

// parseObjectLockConfiguration parses XML object lock configuration from request body
func parseObjectLockConfiguration(request *http.Request) (*ObjectLockConfiguration, error) {
	var config ObjectLockConfiguration
	if err := parseXML(request, &config); err != nil {
		return nil, err
	}
	return &config, nil
}

// validateRetention validates retention configuration
func validateRetention(retention *ObjectRetention) error {
	// AWS requires both Mode and RetainUntilDate for PutObjectRetention
	if retention.Mode == "" {
		return fmt.Errorf("retention configuration must specify Mode")
	}

	if retention.RetainUntilDate == nil {
		return fmt.Errorf("retention configuration must specify RetainUntilDate")
	}

	if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance {
		return fmt.Errorf("invalid retention mode: %s", retention.Mode)
	}

	if retention.RetainUntilDate.Before(time.Now()) {
		return fmt.Errorf("retain until date must be in the future")
	}

	return nil
}

// validateLegalHold validates legal hold configuration
func validateLegalHold(legalHold *ObjectLegalHold) error {
	if legalHold.Status != s3_constants.LegalHoldOn && legalHold.Status != s3_constants.LegalHoldOff {
		return fmt.Errorf("invalid legal hold status: %s", legalHold.Status)
	}

	return nil
}

// validateObjectLockConfiguration validates object lock configuration
func validateObjectLockConfiguration(config *ObjectLockConfiguration) error {
	// ObjectLockEnabled is required for bucket-level configuration
	if config.ObjectLockEnabled == "" {
		return fmt.Errorf("object lock configuration must specify ObjectLockEnabled")
	}

	// Validate ObjectLockEnabled value
	if config.ObjectLockEnabled != s3_constants.ObjectLockEnabled {
		return fmt.Errorf("invalid object lock enabled value: %s", config.ObjectLockEnabled)
	}

	// Validate Rule if present
	if config.Rule != nil {
		if config.Rule.DefaultRetention == nil {
			return fmt.Errorf("rule configuration must specify DefaultRetention")
		}
		return validateDefaultRetention(config.Rule.DefaultRetention)
	}

	return nil
}

// validateDefaultRetention validates default retention configuration
func validateDefaultRetention(retention *DefaultRetention) error {
	// Mode is required
	if retention.Mode == "" {
		return fmt.Errorf("default retention must specify Mode")
	}

	// Mode must be valid
	if retention.Mode != s3_constants.RetentionModeGovernance && retention.Mode != s3_constants.RetentionModeCompliance {
		return fmt.Errorf("invalid default retention mode: %s", retention.Mode)
	}

	// Exactly one of Days or Years must be specified
	if retention.Days == 0 && retention.Years == 0 {
		return fmt.Errorf("default retention must specify either Days or Years")
	}

	if retention.Days > 0 && retention.Years > 0 {
		return fmt.Errorf("default retention cannot specify both Days and Years")
	}

	// Validate ranges
	if retention.Days < 0 || retention.Days > MaxRetentionDays {
		return fmt.Errorf("default retention days must be between 0 and %d", MaxRetentionDays)
	}

	if retention.Years < 0 || retention.Years > MaxRetentionYears {
		return fmt.Errorf("default retention years must be between 0 and %d", MaxRetentionYears)
	}

	return nil
}

// getObjectEntry retrieves the appropriate object entry based on versioning and versionId
func (s3a *S3ApiServer) getObjectEntry(bucket, object, versionId string) (*filer_pb.Entry, error) {
	var entry *filer_pb.Entry
	var err error

	if versionId != "" {
		entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
	} else {
		// Check if versioning is enabled
		versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
		if vErr != nil {
			return nil, fmt.Errorf("error checking versioning: %w", vErr)
		}

		if versioningEnabled {
			entry, err = s3a.getLatestObjectVersion(bucket, object)
		} else {
			bucketDir := s3a.option.BucketsPath + "/" + bucket
			entry, err = s3a.getEntry(bucketDir, object)
		}
	}

	if err != nil {
		return nil, fmt.Errorf("failed to retrieve object %s/%s: %w", bucket, object, ErrObjectNotFound)
	}

	return entry, nil
}

// getObjectRetention retrieves retention configuration from object metadata
func (s3a *S3ApiServer) getObjectRetention(bucket, object, versionId string) (*ObjectRetention, error) {
	entry, err := s3a.getObjectEntry(bucket, object, versionId)
	if err != nil {
		return nil, err
	}

	if entry.Extended == nil {
		return nil, ErrNoRetentionConfiguration
	}

	retention := &ObjectRetention{}

	if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
		retention.Mode = string(modeBytes)
	}

	if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
		if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
			t := time.Unix(timestamp, 0)
			retention.RetainUntilDate = &t
		} else {
			return nil, fmt.Errorf("failed to parse retention timestamp for %s/%s: corrupted timestamp data", bucket, object)
		}
	}

	if retention.Mode == "" || retention.RetainUntilDate == nil {
		return nil, ErrNoRetentionConfiguration
	}

	return retention, nil
}

// setObjectRetention sets retention configuration on object metadata
func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, retention *ObjectRetention, bypassGovernance bool) error {
	var entry *filer_pb.Entry
	var err error
	var entryPath string

	if versionId != "" {
		entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
		if err != nil {
			return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound)
		}
		entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
	} else {
		// Check if versioning is enabled
		versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
		if vErr != nil {
			return fmt.Errorf("error checking versioning: %w", vErr)
		}

		if versioningEnabled {
			entry, err = s3a.getLatestObjectVersion(bucket, object)
			if err != nil {
				return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound)
			}
			// Extract version ID from entry metadata
			if entry.Extended != nil {
				if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
					versionId = string(versionIdBytes)
					entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
				}
			}
		} else {
			bucketDir := s3a.option.BucketsPath + "/" + bucket
			entry, err = s3a.getEntry(bucketDir, object)
			if err != nil {
				return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound)
			}
			entryPath = object
		}
	}

	// Check if object is already under retention
	if entry.Extended != nil {
		if existingMode, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
			if string(existingMode) == s3_constants.RetentionModeCompliance && !bypassGovernance {
				return fmt.Errorf("cannot modify retention on object under COMPLIANCE mode")
			}

			if existingDateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
				if timestamp, err := strconv.ParseInt(string(existingDateBytes), 10, 64); err == nil {
					existingDate := time.Unix(timestamp, 0)
					if existingDate.After(time.Now()) && string(existingMode) == s3_constants.RetentionModeGovernance && !bypassGovernance {
						return fmt.Errorf("cannot modify retention on object under GOVERNANCE mode without bypass")
					}
				}
			}
		}
	}

	// Update retention metadata
	if entry.Extended == nil {
		entry.Extended = make(map[string][]byte)
	}

	if retention.Mode != "" {
		entry.Extended[s3_constants.ExtObjectLockModeKey] = []byte(retention.Mode)
	}

	if retention.RetainUntilDate != nil {
		entry.Extended[s3_constants.ExtRetentionUntilDateKey] = []byte(strconv.FormatInt(retention.RetainUntilDate.Unix(), 10))

		// Also update the existing WORM fields for compatibility
		entry.WormEnforcedAtTsNs = time.Now().UnixNano()
	}

	// Update the entry
	// NOTE: Potential race condition exists if concurrent calls to PutObjectRetention
	// and PutObjectLegalHold update the same object simultaneously, as they might
	// overwrite each other's Extended map changes. This is mitigated by the fact
	// that mkFile operations are typically serialized at the filer level, but
	// future implementations might consider using atomic update operations or
	// entry-level locking for complete safety.
	bucketDir := s3a.option.BucketsPath + "/" + bucket
	return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
		updatedEntry.Extended = entry.Extended
		updatedEntry.WormEnforcedAtTsNs = entry.WormEnforcedAtTsNs
	})
}

// getObjectLegalHold retrieves legal hold configuration from object metadata
func (s3a *S3ApiServer) getObjectLegalHold(bucket, object, versionId string) (*ObjectLegalHold, error) {
	entry, err := s3a.getObjectEntry(bucket, object, versionId)
	if err != nil {
		return nil, err
	}

	if entry.Extended == nil {
		return nil, ErrNoLegalHoldConfiguration
	}

	legalHold := &ObjectLegalHold{}

	if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
		legalHold.Status = string(statusBytes)
	} else {
		return nil, ErrNoLegalHoldConfiguration
	}

	return legalHold, nil
}

// setObjectLegalHold sets legal hold configuration on object metadata
func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, legalHold *ObjectLegalHold) error {
	var entry *filer_pb.Entry
	var err error
	var entryPath string

	if versionId != "" {
		entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
		if err != nil {
			return fmt.Errorf("failed to get version %s for object %s/%s: %w", versionId, bucket, object, ErrVersionNotFound)
		}
		entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
	} else {
		// Check if versioning is enabled
		versioningEnabled, vErr := s3a.isVersioningEnabled(bucket)
		if vErr != nil {
			return fmt.Errorf("error checking versioning: %w", vErr)
		}

		if versioningEnabled {
			entry, err = s3a.getLatestObjectVersion(bucket, object)
			if err != nil {
				return fmt.Errorf("failed to get latest version for object %s/%s: %w", bucket, object, ErrLatestVersionNotFound)
			}
			// Extract version ID from entry metadata
			if entry.Extended != nil {
				if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
					versionId = string(versionIdBytes)
					entryPath = object + ".versions/" + s3a.getVersionFileName(versionId)
				}
			}
		} else {
			bucketDir := s3a.option.BucketsPath + "/" + bucket
			entry, err = s3a.getEntry(bucketDir, object)
			if err != nil {
				return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound)
			}
			entryPath = object
		}
	}

	// Update legal hold metadata
	if entry.Extended == nil {
		entry.Extended = make(map[string][]byte)
	}

	entry.Extended[s3_constants.ExtLegalHoldKey] = []byte(legalHold.Status)

	// Update the entry
	// NOTE: Potential race condition exists if concurrent calls to PutObjectRetention
	// and PutObjectLegalHold update the same object simultaneously, as they might
	// overwrite each other's Extended map changes. This is mitigated by the fact
	// that mkFile operations are typically serialized at the filer level, but
	// future implementations might consider using atomic update operations or
	// entry-level locking for complete safety.
	bucketDir := s3a.option.BucketsPath + "/" + bucket
	return s3a.mkFile(bucketDir, entryPath, entry.Chunks, func(updatedEntry *filer_pb.Entry) {
		updatedEntry.Extended = entry.Extended
	})
}

// isObjectRetentionActive checks if an object is currently under retention
func (s3a *S3ApiServer) isObjectRetentionActive(bucket, object, versionId string) (bool, error) {
	retention, err := s3a.getObjectRetention(bucket, object, versionId)
	if err != nil {
		// If no retention found, object is not under retention
		if errors.Is(err, ErrNoRetentionConfiguration) {
			return false, nil
		}
		return false, err
	}

	if retention.RetainUntilDate != nil && retention.RetainUntilDate.After(time.Now()) {
		return true, nil
	}

	return false, nil
}

// getRetentionFromEntry extracts retention configuration from an existing entry
func (s3a *S3ApiServer) getRetentionFromEntry(entry *filer_pb.Entry) (*ObjectRetention, bool, error) {
	if entry.Extended == nil {
		return nil, false, nil
	}

	retention := &ObjectRetention{}

	if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
		retention.Mode = string(modeBytes)
	}

	if dateBytes, exists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; exists {
		if timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64); err == nil {
			t := time.Unix(timestamp, 0)
			retention.RetainUntilDate = &t
		} else {
			return nil, false, fmt.Errorf("failed to parse retention timestamp: corrupted timestamp data")
		}
	}

	if retention.Mode == "" || retention.RetainUntilDate == nil {
		return nil, false, nil
	}

	// Check if retention is currently active
	isActive := retention.RetainUntilDate.After(time.Now())
	return retention, isActive, nil
}

// getLegalHoldFromEntry extracts legal hold configuration from an existing entry
func (s3a *S3ApiServer) getLegalHoldFromEntry(entry *filer_pb.Entry) (*ObjectLegalHold, bool, error) {
	if entry.Extended == nil {
		return nil, false, nil
	}

	legalHold := &ObjectLegalHold{}

	if statusBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
		legalHold.Status = string(statusBytes)
	} else {
		return nil, false, nil
	}

	isActive := legalHold.Status == s3_constants.LegalHoldOn
	return legalHold, isActive, nil
}

// checkGovernanceBypassPermission checks if the user has permission to bypass governance retention
func (s3a *S3ApiServer) checkGovernanceBypassPermission(request *http.Request, bucket, object string) bool {
	// Use the existing IAM auth system to check the specific permission
	// Create the governance bypass action with proper bucket/object concatenation
	// Note: path.Join would drop bucket if object has leading slash, so use explicit formatting
	resource := fmt.Sprintf("%s/%s", bucket, strings.TrimPrefix(object, "/"))
	action := Action(fmt.Sprintf("%s:%s", s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION, resource))

	// Use the IAM system to authenticate and authorize this specific action
	identity, errCode := s3a.iam.authRequest(request, action)
	if errCode != s3err.ErrNone {
		glog.V(3).Infof("IAM auth failed for governance bypass: %v", errCode)
		return false
	}

	// Verify that the authenticated identity can perform this action
	if identity != nil && identity.canDo(action, bucket, object) {
		return true
	}

	// Additional check: allow users with Admin action to bypass governance retention
	// Use the proper S3 Admin action constant instead of generic isAdmin() method
	adminAction := Action(fmt.Sprintf("%s:%s", s3_constants.ACTION_ADMIN, resource))
	if identity != nil && identity.canDo(adminAction, bucket, object) {
		glog.V(2).Infof("Admin user %s granted governance bypass permission for %s/%s", identity.Name, bucket, object)
		return true
	}

	return false
}

// checkObjectLockPermissions checks if an object can be deleted or modified
func (s3a *S3ApiServer) checkObjectLockPermissions(request *http.Request, bucket, object, versionId string, bypassGovernance bool) error {
	// Get the object entry once to check both retention and legal hold
	entry, err := s3a.getObjectEntry(bucket, object, versionId)
	if err != nil {
		// If object doesn't exist, it's not under retention or legal hold - this is expected during delete operations
		if errors.Is(err, ErrObjectNotFound) || errors.Is(err, ErrVersionNotFound) || errors.Is(err, ErrLatestVersionNotFound) {
			// Object doesn't exist, so it can't be under retention or legal hold - this is normal
			glog.V(4).Infof("Object %s/%s (versionId: %s) not found during object lock check (expected during delete operations)", bucket, object, versionId)
			return nil
		}
		glog.Warningf("Error retrieving object %s/%s (versionId: %s) for lock check: %v", bucket, object, versionId, err)
		return err
	}

	// Extract retention information from the entry
	retention, retentionActive, err := s3a.getRetentionFromEntry(entry)
	if err != nil {
		glog.Warningf("Error parsing retention for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
		// Continue with legal hold check even if retention parsing fails
	}

	// Extract legal hold information from the entry
	_, legalHoldActive, err := s3a.getLegalHoldFromEntry(entry)
	if err != nil {
		glog.Warningf("Error parsing legal hold for %s/%s (versionId: %s): %v", bucket, object, versionId, err)
		// Continue with retention check even if legal hold parsing fails
	}

	// If object is under legal hold, it cannot be deleted or modified
	if legalHoldActive {
		return ErrObjectUnderLegalHold
	}

	// If object is under retention, check the mode
	if retentionActive && retention != nil {
		if retention.Mode == s3_constants.RetentionModeCompliance {
			return ErrComplianceModeActive
		}

		if retention.Mode == s3_constants.RetentionModeGovernance {
			if !bypassGovernance {
				return ErrGovernanceModeActive
			}

			// If bypass is requested, check if user has permission
			if !s3a.checkGovernanceBypassPermission(request, bucket, object) {
				glog.V(2).Infof("User does not have s3:BypassGovernanceRetention permission for %s/%s", bucket, object)
				return ErrGovernanceBypassNotPermitted
			}
		}
	}

	return nil
}

// isObjectLockAvailable checks if Object Lock features are available for the bucket
// Object Lock requires versioning to be enabled (AWS S3 requirement)
func (s3a *S3ApiServer) isObjectLockAvailable(bucket string) error {
	versioningEnabled, err := s3a.isVersioningEnabled(bucket)
	if err != nil {
		if errors.Is(err, filer_pb.ErrNotFound) {
			return ErrBucketNotFound
		}
		return fmt.Errorf("error checking versioning status: %w", err)
	}

	if !versioningEnabled {
		return fmt.Errorf("object lock requires versioning to be enabled")
	}

	return nil
}

// handleObjectLockAvailabilityCheck is a helper function to check object lock availability
// and write the appropriate error response if not available. This reduces code duplication
// across all retention handlers.
func (s3a *S3ApiServer) handleObjectLockAvailabilityCheck(w http.ResponseWriter, request *http.Request, bucket, handlerName string) bool {
	if err := s3a.isObjectLockAvailable(bucket); err != nil {
		glog.Errorf("%s: object lock not available for bucket %s: %v", handlerName, bucket, err)
		if errors.Is(err, ErrBucketNotFound) {
			s3err.WriteErrorResponse(w, request, s3err.ErrNoSuchBucket)
		} else {
			s3err.WriteErrorResponse(w, request, s3err.ErrInvalidRequest)
		}
		return false
	}
	return true
}