aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_bucket_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_bucket_handlers.go')
-rw-r--r--weed/s3api/s3api_bucket_handlers.go158
1 files changed, 10 insertions, 148 deletions
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 2d67aa551..5ff155890 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -9,9 +9,7 @@ import (
"fmt"
"math"
"net/http"
- "path"
"sort"
- "strconv"
"strings"
"time"
@@ -336,7 +334,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
// If object lock is enabled, check for objects with active locks
if bucketConfig.ObjectLockConfig != nil {
- hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket)
+ hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket)
if checkErr != nil {
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -400,158 +398,22 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
}
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
-func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
+// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go
+func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
+ var hasLocks bool
+ var checkErr error
- // Check all objects including versions for active locks
- // Establish current time once at the start for consistency across the entire scan
- hasLocks := false
- currentTime := time.Now()
- err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime)
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath)
+ return checkErr
+ })
if err != nil {
- return false, fmt.Errorf("error checking for locked objects: %w", err)
+ return false, err
}
-
return hasLocks, nil
}
-const (
- // lockCheckPaginationSize is the page size for listing directories during lock checks
- lockCheckPaginationSize = 10000
-)
-
-// errStopPagination is a sentinel error to signal early termination of pagination
-var errStopPagination = errors.New("stop pagination")
-
-// paginateEntries iterates through directory entries with pagination
-// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully.
-func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error {
- startFrom := ""
- for {
- entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize)
- if err != nil {
- // Fail-safe: propagate error to prevent incorrect bucket deletion
- return fmt.Errorf("failed to list directory %s: %w", dir, err)
- }
-
- if err := fn(entries); err != nil {
- if errors.Is(err, errStopPagination) {
- return nil
- }
- return err
- }
-
- if isLast || len(entries) == 0 {
- break
- }
- // Use the last entry name as the start point for next page
- startFrom = entries[len(entries)-1].Name
- }
- return nil
-}
-
-// recursivelyCheckLocks recursively checks all objects and versions for active locks
-// Uses pagination to handle directories with more than 10,000 entries
-func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
- if *hasLocks {
- // Early exit if we've already found a locked object
- return nil
- }
-
- // Process entries in the current directory with pagination
- err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error {
- for _, entry := range entries {
- if *hasLocks {
- // Early exit if we've already found a locked object
- return errStopPagination
- }
-
- // Skip special directories (multipart uploads, etc)
- if entry.Name == s3_constants.MultipartUploadsFolder {
- continue
- }
-
- if entry.IsDirectory {
- subDir := path.Join(dir, entry.Name)
- if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
- // If it's a .versions directory, check all version files with pagination
- err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error {
- for _, versionEntry := range versionEntries {
- if s3a.entryHasActiveLock(versionEntry, currentTime) {
- *hasLocks = true
- glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name)
- return errStopPagination
- }
- }
- return nil
- })
- if err != nil {
- return err
- }
- } else {
- // Recursively check other subdirectories
- subRelativePath := path.Join(relativePath, entry.Name)
- if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
- return err
- }
- // Early exit if a locked object was found in the subdirectory
- if *hasLocks {
- return errStopPagination
- }
- }
- } else {
- // Check regular files for locks
- if s3a.entryHasActiveLock(entry, currentTime) {
- *hasLocks = true
- objectPath := path.Join(relativePath, entry.Name)
- glog.V(2).Infof("Found object with active lock: %s", objectPath)
- return errStopPagination
- }
- }
- }
- return nil
- })
-
- return err
-}
-
-// entryHasActiveLock checks if an entry has an active retention or legal hold
-func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
- if entry.Extended == nil {
- return false
- }
-
- // Check for active legal hold
- if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
- if string(legalHoldBytes) == s3_constants.LegalHoldOn {
- return true
- }
- }
-
- // Check for active retention
- if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
- mode := string(modeBytes)
- if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
- // Check if retention is still active
- if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
- timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64)
- if err != nil {
- // Fail-safe: if we can't parse the retention date, assume the object is locked
- // to prevent accidental data loss
- glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err)
- return true
- }
- retainUntil := time.Unix(timestamp, 0)
- if retainUntil.After(currentTime) {
- return true
- }
- }
- }
- }
-
- return false
-}
-
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)