diff options
Diffstat (limited to 'weed/s3api/s3api_bucket_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 158 |
1 files changed, 10 insertions, 148 deletions
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 2d67aa551..5ff155890 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -9,9 +9,7 @@ import ( "fmt" "math" "net/http" - "path" "sort" - "strconv" "strings" "time" @@ -336,7 +334,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque // If object lock is enabled, check for objects with active locks if bucketConfig.ObjectLockConfig != nil { - hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket) + hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket) if checkErr != nil { glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -400,158 +398,22 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque } // hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold -func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { +// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go +func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) { bucketPath := s3a.option.BucketsPath + "/" + bucket + var hasLocks bool + var checkErr error - // Check all objects including versions for active locks - // Establish current time once at the start for consistency across the entire scan - hasLocks := false - currentTime := time.Now() - err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime) + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath) + return checkErr + }) if err != nil { - return false, fmt.Errorf("error checking for locked objects: %w", err) + return false, err } - return hasLocks, nil } -const ( - // lockCheckPaginationSize is the page size for listing directories during lock checks - lockCheckPaginationSize = 10000 -) - -// errStopPagination is a sentinel error to signal early termination of pagination -var errStopPagination = errors.New("stop pagination") - -// paginateEntries iterates through directory entries with pagination -// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully. -func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { - startFrom := "" - for { - entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize) - if err != nil { - // Fail-safe: propagate error to prevent incorrect bucket deletion - return fmt.Errorf("failed to list directory %s: %w", dir, err) - } - - if err := fn(entries); err != nil { - if errors.Is(err, errStopPagination) { - return nil - } - return err - } - - if isLast || len(entries) == 0 { - break - } - // Use the last entry name as the start point for next page - startFrom = entries[len(entries)-1].Name - } - return nil -} - -// recursivelyCheckLocks recursively checks all objects and versions for active locks -// Uses pagination to handle directories with more than 10,000 entries -func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error { - if *hasLocks { - // Early exit if we've already found a locked object - return nil - } - - // Process entries in the current directory with pagination - err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { - for _, entry := range entries { - if *hasLocks { - // Early exit if we've already found a locked object - return errStopPagination - } - - // Skip special directories (multipart uploads, etc) - if entry.Name == s3_constants.MultipartUploadsFolder { - continue - } - - if entry.IsDirectory { - subDir := path.Join(dir, entry.Name) - if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - // If it's a .versions directory, check all version files with pagination - err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error { - for _, versionEntry := range versionEntries { - if s3a.entryHasActiveLock(versionEntry, currentTime) { - *hasLocks = true - glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name) - return errStopPagination - } - } - return nil - }) - if err != nil { - return err - } - } else { - // Recursively check other subdirectories - subRelativePath := path.Join(relativePath, entry.Name) - if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil { - return err - } - // Early exit if a locked object was found in the subdirectory - if *hasLocks { - return errStopPagination - } - } - } else { - // Check regular files for locks - if s3a.entryHasActiveLock(entry, currentTime) { - *hasLocks = true - objectPath := path.Join(relativePath, entry.Name) - glog.V(2).Infof("Found object with active lock: %s", objectPath) - return errStopPagination - } - } - } - return nil - }) - - return err -} - -// entryHasActiveLock checks if an entry has an active retention or legal hold -func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { - if entry.Extended == nil { - return false - } - - // Check for active legal hold - if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { - if string(legalHoldBytes) == s3_constants.LegalHoldOn { - return true - } - } - - // Check for active retention - if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { - mode := string(modeBytes) - if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { - // Check if retention is still active - if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { - timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64) - if err != nil { - // Fail-safe: if we can't parse the retention date, assume the object is locked - // to prevent accidental data loss - glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err) - return true - } - retainUntil := time.Unix(timestamp, 0) - if retainUntil.After(currentTime) { - return true - } - } - } - } - - return false -} - func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) |
