diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-13 13:41:25 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-13 13:41:25 -0800 |
| commit | f77e6ed2d4bb228187c492b9e5c04ba09362b0a6 (patch) | |
| tree | 795131eef7d56fe01ecad5b703f6f68be93b6cdd /weed/s3api/s3api_bucket_handlers.go | |
| parent | d80d8be01208ffe82861b8aa50d4baf599fdfa02 (diff) | |
| download | seaweedfs-f77e6ed2d4bb228187c492b9e5c04ba09362b0a6.tar.xz seaweedfs-f77e6ed2d4bb228187c492b9e5c04ba09362b0a6.zip | |
fix: admin UI bucket delete now properly deletes collection and checks Object Lock (#7734)
* fix: admin UI bucket delete now properly deletes collection and checks Object Lock
Fixes #7711
The admin UI's DeleteS3Bucket function was missing two critical behaviors:
1. It did not delete the collection from the master (unlike s3.bucket.delete
shell command), leaving orphaned volume data that caused fs.verify errors.
2. It did not check for Object Lock protections before deletion, potentially
allowing deletion of buckets with locked objects.
Changes:
- Add shared Object Lock checking utilities to object_lock_utils.go:
- EntryHasActiveLock: standalone function to check if an entry has active lock
- HasObjectsWithActiveLocks: shared function to scan bucket for locked objects
- Refactor S3 API entryHasActiveLock to use shared EntryHasActiveLock function
- Update admin UI DeleteS3Bucket to:
- Check Object Lock using shared HasObjectsWithActiveLocks utility
- Delete the collection before deleting filer entries (matching s3.bucket.delete)
* refactor: S3 API uses shared Object Lock utilities
Removes 114 lines of duplicated code from s3api_bucket_handlers.go by
having hasObjectsWithActiveLocks delegate to the shared HasObjectsWithActiveLocks
function in object_lock_utils.go.
Now both S3 API and Admin UI use the same shared utilities:
- EntryHasActiveLock
- HasObjectsWithActiveLocks
- recursivelyCheckLocksWithClient
- checkVersionsForLocksWithClient
* feat: s3.bucket.delete shell command now checks Object Lock
Add Object Lock protection to the s3.bucket.delete shell command.
If the bucket has Object Lock enabled and contains objects with active
retention or legal hold, deletion is prevented.
Also refactors Object Lock checking utilities into a new s3_objectlock
package to avoid import cycles between shell, s3api, and admin packages.
All three components now share the same logic:
- S3 API (DeleteBucketHandler)
- Admin UI (DeleteS3Bucket)
- Shell command (s3.bucket.delete)
* refactor: unified Object Lock checking and consistent deletion parameters
1. Add CheckBucketForLockedObjects() - a unified function that combines:
- Bucket entry lookup
- Object Lock enabled check
- Scan for locked objects
2. All three components now use this single function:
- S3 API (via s3api.CheckBucketForLockedObjects)
- Admin UI (via s3api.CheckBucketForLockedObjects)
- Shell command (via s3_objectlock.CheckBucketForLockedObjects)
3. Aligned deletion parameters across all components:
- isDeleteData: false (collection already deleted separately)
- isRecursive: true
- ignoreRecursiveError: true
* fix: properly handle non-EOF errors in Recv() loops
The Recv() loops in recursivelyCheckLocksWithClient and
checkVersionsForLocksWithClient were breaking on any error, which
could hide real stream errors and incorrectly report 'no locks found'.
Now:
- io.EOF: break loop (normal end of stream)
- any other error: return it so caller knows the stream failed
* fix: address PR review comments
1. Add path traversal protection - validate entry names before building
subdirectory paths. Skip entries with empty names, '.', '..', or
containing path separators.
2. Use exact match for .versions folder instead of HasSuffix() to avoid
mismatching unrelated directories like 'foo.versions'.
3. Replace path.Join with simple string concatenation since we now
validate entry names.
* refactor: extract paginateEntries helper to reduce duplication
The recursivelyCheckLocksWithClient and checkVersionsForLocksWithClient
functions shared significant structural similarity. Extracted a generic
paginateEntries helper that:
- Handles pagination logic (lastFileName tracking, Limit)
- Handles stream receiving with proper EOF vs error handling
- Validates entry names (path traversal protection)
- Calls a processEntry callback for business logic
This centralizes pagination logic and makes the code more maintainable.
* feat: add context propagation for timeout and cancellation support
All Object Lock checking functions now accept context.Context parameter:
- paginateEntries(ctx, client, dir, processEntry)
- recursivelyCheckLocksWithClient(ctx, client, dir, hasLocks, currentTime)
- checkVersionsForLocksWithClient(ctx, client, versionsDir, hasLocks, currentTime)
- HasObjectsWithActiveLocks(ctx, client, bucketPath)
- CheckBucketForLockedObjects(ctx, client, bucketsPath, bucketName)
This enables:
- Timeout support for large bucket scans
- Cancellation propagation from HTTP requests
- The S3 API handler now uses r.Context() for proper request lifecycle
* fix: address PR review comments
1. Add DefaultBucketsPath constant in admin_server.go instead of
hardcoding "/buckets" in multiple places.
2. Add defensive normalization in EntryHasActiveLock:
- TrimSpace to handle whitespace around values
- ToUpper for case-insensitive comparison of legal hold and
retention mode values
- TrimSpace on retention date before parsing
* fix: use ctx variable consistently instead of context.Background()
In both DeleteS3Bucket and command_s3_bucket_delete, use the ctx
variable defined at the start of the function for all gRPC calls
instead of creating new context.Background() instances.
Diffstat (limited to 'weed/s3api/s3api_bucket_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 158 |
1 files changed, 10 insertions, 148 deletions
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 2d67aa551..5ff155890 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -9,9 +9,7 @@ import ( "fmt" "math" "net/http" - "path" "sort" - "strconv" "strings" "time" @@ -336,7 +334,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque // If object lock is enabled, check for objects with active locks if bucketConfig.ObjectLockConfig != nil { - hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket) + hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket) if checkErr != nil { glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -400,158 +398,22 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque } // hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold -func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) { +// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go +func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) { bucketPath := s3a.option.BucketsPath + "/" + bucket + var hasLocks bool + var checkErr error - // Check all objects including versions for active locks - // Establish current time once at the start for consistency across the entire scan - hasLocks := false - currentTime := time.Now() - err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime) + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath) + return checkErr + }) if err != nil { - return false, fmt.Errorf("error checking for locked objects: %w", err) + return false, err } - return hasLocks, nil } -const ( - // lockCheckPaginationSize is the page size for listing directories during lock checks - lockCheckPaginationSize = 10000 -) - -// errStopPagination is a sentinel error to signal early termination of pagination -var errStopPagination = errors.New("stop pagination") - -// paginateEntries iterates through directory entries with pagination -// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully. -func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error { - startFrom := "" - for { - entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize) - if err != nil { - // Fail-safe: propagate error to prevent incorrect bucket deletion - return fmt.Errorf("failed to list directory %s: %w", dir, err) - } - - if err := fn(entries); err != nil { - if errors.Is(err, errStopPagination) { - return nil - } - return err - } - - if isLast || len(entries) == 0 { - break - } - // Use the last entry name as the start point for next page - startFrom = entries[len(entries)-1].Name - } - return nil -} - -// recursivelyCheckLocks recursively checks all objects and versions for active locks -// Uses pagination to handle directories with more than 10,000 entries -func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error { - if *hasLocks { - // Early exit if we've already found a locked object - return nil - } - - // Process entries in the current directory with pagination - err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error { - for _, entry := range entries { - if *hasLocks { - // Early exit if we've already found a locked object - return errStopPagination - } - - // Skip special directories (multipart uploads, etc) - if entry.Name == s3_constants.MultipartUploadsFolder { - continue - } - - if entry.IsDirectory { - subDir := path.Join(dir, entry.Name) - if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - // If it's a .versions directory, check all version files with pagination - err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error { - for _, versionEntry := range versionEntries { - if s3a.entryHasActiveLock(versionEntry, currentTime) { - *hasLocks = true - glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name) - return errStopPagination - } - } - return nil - }) - if err != nil { - return err - } - } else { - // Recursively check other subdirectories - subRelativePath := path.Join(relativePath, entry.Name) - if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil { - return err - } - // Early exit if a locked object was found in the subdirectory - if *hasLocks { - return errStopPagination - } - } - } else { - // Check regular files for locks - if s3a.entryHasActiveLock(entry, currentTime) { - *hasLocks = true - objectPath := path.Join(relativePath, entry.Name) - glog.V(2).Infof("Found object with active lock: %s", objectPath) - return errStopPagination - } - } - } - return nil - }) - - return err -} - -// entryHasActiveLock checks if an entry has an active retention or legal hold -func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool { - if entry.Extended == nil { - return false - } - - // Check for active legal hold - if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists { - if string(legalHoldBytes) == s3_constants.LegalHoldOn { - return true - } - } - - // Check for active retention - if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists { - mode := string(modeBytes) - if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance { - // Check if retention is still active - if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists { - timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64) - if err != nil { - // Fail-safe: if we can't parse the retention date, assume the object is locked - // to prevent accidental data loss - glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err) - return true - } - retainUntil := time.Unix(timestamp, 0) - if retainUntil.After(currentTime) { - return true - } - } - } - } - - return false -} - func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) { bucket, _ := s3_constants.GetBucketAndObject(r) |
