aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/admin/dash/admin_server.go41
-rw-r--r--weed/s3api/object_lock_utils.go27
-rw-r--r--weed/s3api/s3_objectlock/object_lock_check.go232
-rw-r--r--weed/s3api/s3api_bucket_handlers.go158
-rw-r--r--weed/shell/command_s3_bucket_delete.go16
5 files changed, 316 insertions, 158 deletions
diff --git a/weed/admin/dash/admin_server.go b/weed/admin/dash/admin_server.go
index eeeccf981..c62b9da07 100644
--- a/weed/admin/dash/admin_server.go
+++ b/weed/admin/dash/admin_server.go
@@ -30,6 +30,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/worker/tasks"
)
+const (
+ // DefaultBucketsPath is the default path for S3 buckets in the filer
+ DefaultBucketsPath = "/buckets"
+)
+
type AdminServer struct {
masterClient *wdclient.MasterClient
templateFS http.FileSystem
@@ -271,7 +276,7 @@ func (s *AdminServer) GetS3Buckets() ([]S3Bucket, error) {
err = s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// List buckets by looking at the /buckets directory
stream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
- Directory: "/buckets",
+ Directory: DefaultBucketsPath,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
@@ -381,7 +386,7 @@ func (s *AdminServer) GetBucketDetails(bucketName string) (*BucketDetails, error
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Get bucket info
bucketResp, err := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
- Directory: "/buckets",
+ Directory: DefaultBucketsPath,
Name: bucketName,
})
if err != nil {
@@ -506,14 +511,36 @@ func (s *AdminServer) CreateS3Bucket(bucketName string) error {
// DeleteS3Bucket deletes an S3 bucket and all its contents
func (s *AdminServer) DeleteS3Bucket(bucketName string) error {
+ // First, check if bucket has Object Lock enabled and if there are locked objects
+ ctx := context.Background()
+ err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return s3api.CheckBucketForLockedObjects(ctx, client, DefaultBucketsPath, bucketName)
+ })
+ if err != nil {
+ return err
+ }
+
+ // Delete the collection first (same as s3.bucket.delete shell command)
+ // This ensures volume data is cleaned up properly
+ err = s.WithMasterClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ Name: bucketName,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("failed to delete collection: %w", err)
+ }
+
+ // Then delete bucket directory recursively from filer
+ // Use same parameters as s3.bucket.delete shell command and S3 API
return s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- // Delete bucket directory recursively
- _, err := client.DeleteEntry(context.Background(), &filer_pb.DeleteEntryRequest{
- Directory: "/buckets",
+ _, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{
+ Directory: DefaultBucketsPath,
Name: bucketName,
- IsDeleteData: true,
+ IsDeleteData: false, // Collection already deleted, just remove metadata
IsRecursive: true,
- IgnoreRecursiveError: false,
+ IgnoreRecursiveError: true, // Same as S3 API and shell command
})
if err != nil {
return fmt.Errorf("failed to delete bucket: %w", err)
diff --git a/weed/s3api/object_lock_utils.go b/weed/s3api/object_lock_utils.go
index 39496e14f..6b00d8595 100644
--- a/weed/s3api/object_lock_utils.go
+++ b/weed/s3api/object_lock_utils.go
@@ -1,6 +1,7 @@
package s3api
import (
+ "context"
"encoding/xml"
"fmt"
"strconv"
@@ -9,6 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_objectlock"
)
// ObjectLockUtils provides shared utilities for Object Lock configuration
@@ -361,3 +363,28 @@ func validateDefaultRetention(retention *DefaultRetention) error {
return nil
}
+
+// ====================================================================
+// SHARED OBJECT LOCK CHECKING FUNCTIONS
+// ====================================================================
+// These functions delegate to s3_objectlock package to avoid code duplication.
+// They are kept here for backward compatibility with existing callers.
+
+// EntryHasActiveLock checks if an entry has an active retention or legal hold
+// Delegates to s3_objectlock.EntryHasActiveLock
+func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
+ return s3_objectlock.EntryHasActiveLock(entry, currentTime)
+}
+
+// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
+// Delegates to s3_objectlock.HasObjectsWithActiveLocks
+func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) {
+ return s3_objectlock.HasObjectsWithActiveLocks(ctx, client, bucketPath)
+}
+
+// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled
+// and if so, scans for objects with active locks.
+// Delegates to s3_objectlock.CheckBucketForLockedObjects
+func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error {
+ return s3_objectlock.CheckBucketForLockedObjects(ctx, client, bucketsPath, bucketName)
+}
diff --git a/weed/s3api/s3_objectlock/object_lock_check.go b/weed/s3api/s3_objectlock/object_lock_check.go
new file mode 100644
index 000000000..a66e587c5
--- /dev/null
+++ b/weed/s3api/s3_objectlock/object_lock_check.go
@@ -0,0 +1,232 @@
+package s3_objectlock
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+)
+
+// ====================================================================
+// SHARED OBJECT LOCK CHECKING FUNCTIONS
+// ====================================================================
+// These functions are used by S3 API, Admin UI, and shell commands for
+// checking Object Lock status before bucket deletion.
+
+// EntryHasActiveLock checks if an entry has an active retention or legal hold
+// This is a standalone function that can be used by any component
+func EntryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
+ if entry == nil || entry.Extended == nil {
+ return false
+ }
+
+ // Check for active legal hold (case-insensitive, trimmed for defensive parsing)
+ if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
+ legalHold := strings.TrimSpace(strings.ToUpper(string(legalHoldBytes)))
+ if legalHold == s3_constants.LegalHoldOn {
+ return true
+ }
+ }
+
+ // Check for active retention (case-insensitive, trimmed for defensive parsing)
+ if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
+ mode := strings.TrimSpace(strings.ToUpper(string(modeBytes)))
+ if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
+ // Check if retention is still active
+ if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
+ dateStr := strings.TrimSpace(string(dateBytes))
+ timestamp, err := strconv.ParseInt(dateStr, 10, 64)
+ if err != nil {
+ // Fail-safe: if we can't parse the retention date, assume the object is locked
+ // to prevent accidental data loss
+ glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", dateStr, err)
+ return true
+ }
+ retainUntil := time.Unix(timestamp, 0)
+ if retainUntil.After(currentTime) {
+ return true
+ }
+ }
+ }
+ }
+
+ return false
+}
+
+// HasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
+// This function uses the filer gRPC client to scan the bucket directory
+func HasObjectsWithActiveLocks(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketPath string) (bool, error) {
+ hasLocks := false
+ currentTime := time.Now()
+
+ err := recursivelyCheckLocksWithClient(ctx, client, bucketPath, &hasLocks, currentTime)
+ if err != nil {
+ return false, fmt.Errorf("error checking for locked objects: %w", err)
+ }
+
+ return hasLocks, nil
+}
+
+// paginateEntries is a generic helper that handles pagination logic for listing directory entries.
+// The processEntry callback is called for each entry; returning stop=true stops iteration early.
+func paginateEntries(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string,
+ processEntry func(entry *filer_pb.Entry) (stop bool, err error)) error {
+ lastFileName := ""
+ for {
+ resp, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
+ Directory: dir,
+ StartFromFileName: lastFileName,
+ InclusiveStartFrom: false,
+ Limit: 10000,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to list directory %s: %w", dir, err)
+ }
+
+ entriesReceived := false
+ for {
+ entryResp, recvErr := resp.Recv()
+ if recvErr != nil {
+ if errors.Is(recvErr, io.EOF) {
+ break // Normal end of stream
+ }
+ return fmt.Errorf("failed to receive entry from %s: %w", dir, recvErr)
+ }
+ entriesReceived = true
+ entry := entryResp.Entry
+ lastFileName = entry.Name
+
+ // Skip invalid entry names to prevent path traversal
+ if entry.Name == "" || entry.Name == "." || entry.Name == ".." ||
+ strings.ContainsAny(entry.Name, "/\\") {
+ glog.V(2).Infof("Skipping invalid entry name: %q in %s", entry.Name, dir)
+ continue
+ }
+
+ stop, err := processEntry(entry)
+ if err != nil {
+ return err
+ }
+ if stop {
+ return nil
+ }
+ }
+
+ if !entriesReceived {
+ break
+ }
+ }
+ return nil
+}
+
+// recursivelyCheckLocksWithClient recursively checks all objects and versions for active locks
+func recursivelyCheckLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, dir string, hasLocks *bool, currentTime time.Time) error {
+ if *hasLocks {
+ return nil // Early exit if already found a locked object
+ }
+
+ return paginateEntries(ctx, client, dir, func(entry *filer_pb.Entry) (bool, error) {
+ if *hasLocks {
+ return true, nil // Stop iteration
+ }
+
+ // Skip special directories
+ if entry.Name == s3_constants.MultipartUploadsFolder {
+ return false, nil // Continue
+ }
+
+ if entry.IsDirectory {
+ subDir := dir + "/" + entry.Name
+ if entry.Name == s3_constants.VersionsFolder {
+ // Check all version files (exact match for .versions folder)
+ if err := checkVersionsForLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
+ return false, err
+ }
+ } else {
+ // Recursively check subdirectories
+ if err := recursivelyCheckLocksWithClient(ctx, client, subDir, hasLocks, currentTime); err != nil {
+ return false, err
+ }
+ }
+ } else {
+ // Check if this object has an active lock
+ if EntryHasActiveLock(entry, currentTime) {
+ *hasLocks = true
+ glog.V(2).Infof("Found object with active lock: %s/%s", dir, entry.Name)
+ return true, nil // Stop iteration
+ }
+ }
+ return false, nil // Continue
+ })
+}
+
+// checkVersionsForLocksWithClient checks all versions in a .versions directory for active locks
+func checkVersionsForLocksWithClient(ctx context.Context, client filer_pb.SeaweedFilerClient, versionsDir string, hasLocks *bool, currentTime time.Time) error {
+ return paginateEntries(ctx, client, versionsDir, func(entry *filer_pb.Entry) (bool, error) {
+ if *hasLocks {
+ return true, nil // Stop iteration
+ }
+
+ if EntryHasActiveLock(entry, currentTime) {
+ *hasLocks = true
+ glog.V(2).Infof("Found version with active lock: %s/%s", versionsDir, entry.Name)
+ return true, nil // Stop iteration
+ }
+ return false, nil // Continue
+ })
+}
+
+// IsObjectLockEnabled checks if Object Lock is enabled on a bucket entry
+func IsObjectLockEnabled(entry *filer_pb.Entry) bool {
+ if entry == nil || entry.Extended == nil {
+ return false
+ }
+
+ enabledBytes, exists := entry.Extended[s3_constants.ExtObjectLockEnabledKey]
+ if !exists {
+ return false
+ }
+
+ enabled := string(enabledBytes)
+ return enabled == s3_constants.ObjectLockEnabled || enabled == "true"
+}
+
+// CheckBucketForLockedObjects is a unified function that checks if a bucket has Object Lock enabled
+// and if so, scans for objects with active locks. This combines the bucket lookup and lock check
+// into a single operation used by S3 API, Admin UI, and shell commands.
+// Returns an error if the bucket has locked objects or if the check fails.
+func CheckBucketForLockedObjects(ctx context.Context, client filer_pb.SeaweedFilerClient, bucketsPath, bucketName string) error {
+ // Look up the bucket entry
+ lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{
+ Directory: bucketsPath,
+ Name: bucketName,
+ })
+ if err != nil {
+ return fmt.Errorf("bucket not found: %w", err)
+ }
+
+ // Check if Object Lock is enabled
+ if !IsObjectLockEnabled(lookupResp.Entry) {
+ return nil // No Object Lock, nothing to check
+ }
+
+ // Check for objects with active locks
+ bucketPath := bucketsPath + "/" + bucketName
+ hasLockedObjects, checkErr := HasObjectsWithActiveLocks(ctx, client, bucketPath)
+ if checkErr != nil {
+ return fmt.Errorf("failed to check for locked objects: %w", checkErr)
+ }
+ if hasLockedObjects {
+ return fmt.Errorf("bucket has objects with active Object Lock retention or legal hold")
+ }
+
+ return nil
+}
+
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index 2d67aa551..5ff155890 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -9,9 +9,7 @@ import (
"fmt"
"math"
"net/http"
- "path"
"sort"
- "strconv"
"strings"
"time"
@@ -336,7 +334,7 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
// If object lock is enabled, check for objects with active locks
if bucketConfig.ObjectLockConfig != nil {
- hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(bucket)
+ hasLockedObjects, checkErr := s3a.hasObjectsWithActiveLocks(r.Context(), bucket)
if checkErr != nil {
glog.Errorf("DeleteBucketHandler: failed to check for locked objects in bucket %s: %v", bucket, checkErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -400,158 +398,22 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
}
// hasObjectsWithActiveLocks checks if any objects in the bucket have active retention or legal hold
-func (s3a *S3ApiServer) hasObjectsWithActiveLocks(bucket string) (bool, error) {
+// Delegates to the shared HasObjectsWithActiveLocks function in object_lock_utils.go
+func (s3a *S3ApiServer) hasObjectsWithActiveLocks(ctx context.Context, bucket string) (bool, error) {
bucketPath := s3a.option.BucketsPath + "/" + bucket
+ var hasLocks bool
+ var checkErr error
- // Check all objects including versions for active locks
- // Establish current time once at the start for consistency across the entire scan
- hasLocks := false
- currentTime := time.Now()
- err := s3a.recursivelyCheckLocks(bucketPath, "", &hasLocks, currentTime)
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ hasLocks, checkErr = HasObjectsWithActiveLocks(ctx, client, bucketPath)
+ return checkErr
+ })
if err != nil {
- return false, fmt.Errorf("error checking for locked objects: %w", err)
+ return false, err
}
-
return hasLocks, nil
}
-const (
- // lockCheckPaginationSize is the page size for listing directories during lock checks
- lockCheckPaginationSize = 10000
-)
-
-// errStopPagination is a sentinel error to signal early termination of pagination
-var errStopPagination = errors.New("stop pagination")
-
-// paginateEntries iterates through directory entries with pagination
-// Calls fn for each page of entries. If fn returns errStopPagination, iteration stops successfully.
-func (s3a *S3ApiServer) paginateEntries(dir string, fn func(entries []*filer_pb.Entry) error) error {
- startFrom := ""
- for {
- entries, isLast, err := s3a.list(dir, "", startFrom, false, lockCheckPaginationSize)
- if err != nil {
- // Fail-safe: propagate error to prevent incorrect bucket deletion
- return fmt.Errorf("failed to list directory %s: %w", dir, err)
- }
-
- if err := fn(entries); err != nil {
- if errors.Is(err, errStopPagination) {
- return nil
- }
- return err
- }
-
- if isLast || len(entries) == 0 {
- break
- }
- // Use the last entry name as the start point for next page
- startFrom = entries[len(entries)-1].Name
- }
- return nil
-}
-
-// recursivelyCheckLocks recursively checks all objects and versions for active locks
-// Uses pagination to handle directories with more than 10,000 entries
-func (s3a *S3ApiServer) recursivelyCheckLocks(dir string, relativePath string, hasLocks *bool, currentTime time.Time) error {
- if *hasLocks {
- // Early exit if we've already found a locked object
- return nil
- }
-
- // Process entries in the current directory with pagination
- err := s3a.paginateEntries(dir, func(entries []*filer_pb.Entry) error {
- for _, entry := range entries {
- if *hasLocks {
- // Early exit if we've already found a locked object
- return errStopPagination
- }
-
- // Skip special directories (multipart uploads, etc)
- if entry.Name == s3_constants.MultipartUploadsFolder {
- continue
- }
-
- if entry.IsDirectory {
- subDir := path.Join(dir, entry.Name)
- if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) {
- // If it's a .versions directory, check all version files with pagination
- err := s3a.paginateEntries(subDir, func(versionEntries []*filer_pb.Entry) error {
- for _, versionEntry := range versionEntries {
- if s3a.entryHasActiveLock(versionEntry, currentTime) {
- *hasLocks = true
- glog.V(2).Infof("Found object with active lock in versions: %s/%s", subDir, versionEntry.Name)
- return errStopPagination
- }
- }
- return nil
- })
- if err != nil {
- return err
- }
- } else {
- // Recursively check other subdirectories
- subRelativePath := path.Join(relativePath, entry.Name)
- if err := s3a.recursivelyCheckLocks(subDir, subRelativePath, hasLocks, currentTime); err != nil {
- return err
- }
- // Early exit if a locked object was found in the subdirectory
- if *hasLocks {
- return errStopPagination
- }
- }
- } else {
- // Check regular files for locks
- if s3a.entryHasActiveLock(entry, currentTime) {
- *hasLocks = true
- objectPath := path.Join(relativePath, entry.Name)
- glog.V(2).Infof("Found object with active lock: %s", objectPath)
- return errStopPagination
- }
- }
- }
- return nil
- })
-
- return err
-}
-
-// entryHasActiveLock checks if an entry has an active retention or legal hold
-func (s3a *S3ApiServer) entryHasActiveLock(entry *filer_pb.Entry, currentTime time.Time) bool {
- if entry.Extended == nil {
- return false
- }
-
- // Check for active legal hold
- if legalHoldBytes, exists := entry.Extended[s3_constants.ExtLegalHoldKey]; exists {
- if string(legalHoldBytes) == s3_constants.LegalHoldOn {
- return true
- }
- }
-
- // Check for active retention
- if modeBytes, exists := entry.Extended[s3_constants.ExtObjectLockModeKey]; exists {
- mode := string(modeBytes)
- if mode == s3_constants.RetentionModeCompliance || mode == s3_constants.RetentionModeGovernance {
- // Check if retention is still active
- if dateBytes, dateExists := entry.Extended[s3_constants.ExtRetentionUntilDateKey]; dateExists {
- timestamp, err := strconv.ParseInt(string(dateBytes), 10, 64)
- if err != nil {
- // Fail-safe: if we can't parse the retention date, assume the object is locked
- // to prevent accidental data loss
- glog.Warningf("Failed to parse retention date '%s' for entry, assuming locked: %v", string(dateBytes), err)
- return true
- }
- retainUntil := time.Unix(timestamp, 0)
- if retainUntil.After(currentTime) {
- return true
- }
- }
- }
- }
-
- return false
-}
-
func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
bucket, _ := s3_constants.GetBucketAndObject(r)
diff --git a/weed/shell/command_s3_bucket_delete.go b/weed/shell/command_s3_bucket_delete.go
index 0227151fe..ddd3201e9 100644
--- a/weed/shell/command_s3_bucket_delete.go
+++ b/weed/shell/command_s3_bucket_delete.go
@@ -4,10 +4,11 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"io"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_objectlock"
)
func init() {
@@ -55,9 +56,18 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
return fmt.Errorf("read buckets: %w", err)
}
+ // Check if bucket has Object Lock enabled and if there are locked objects
+ ctx := context.Background()
+ err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return s3_objectlock.CheckBucketForLockedObjects(ctx, client, filerBucketsPath, *bucketName)
+ })
+ if err != nil {
+ return err
+ }
+
// delete the collection directly first
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- _, err = client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ _, err = client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
Name: getCollectionName(commandEnv, *bucketName),
})
return err
@@ -66,6 +76,6 @@ func (c *commandS3BucketDelete) Do(args []string, commandEnv *CommandEnv, writer
return
}
- return filer_pb.Remove(context.Background(), commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil)
+ return filer_pb.Remove(ctx, commandEnv, filerBucketsPath, *bucketName, false, true, true, false, nil)
}