aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/entry.go24
-rw-r--r--weed/filer/filer.go133
-rw-r--r--weed/pb/filer_pb/filer_client.go56
-rw-r--r--weed/pb/filer_pb/filer_pb_helper.go26
-rw-r--r--weed/s3api/filer_multipart.go19
-rw-r--r--weed/s3api/filer_util.go107
-rw-r--r--weed/s3api/s3_constants/extend_key.go1
-rw-r--r--weed/s3api/s3_constants/header.go1
-rw-r--r--weed/s3api/s3api_bucket_handlers.go12
-rw-r--r--weed/s3api/s3api_object_handlers.go2
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go89
-rw-r--r--weed/s3api/s3api_object_handlers_put.go3
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go15
-rw-r--r--weed/util/constants_lifecycle_interval_10sec.go8
-rw-r--r--weed/util/constants_lifecycle_interval_day.go8
15 files changed, 439 insertions, 65 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index 5bd1a3c56..4757d5c9e 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -1,6 +1,7 @@
package filer
import (
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"os"
"time"
@@ -143,3 +144,26 @@ func maxUint64(x, y uint64) uint64 {
}
return y
}
+
+func (entry *Entry) IsExpireS3Enabled() (exist bool) {
+ if entry.Extended != nil {
+ _, exist = entry.Extended[s3_constants.SeaweedFSExpiresS3]
+ }
+ return exist
+}
+
+func (entry *Entry) IsS3Versioning() (exist bool) {
+ if entry.Extended != nil {
+ _, exist = entry.Extended[s3_constants.ExtVersionIdKey]
+ }
+ return exist
+}
+
+func (entry *Entry) GetS3ExpireTime() (expireTime time.Time) {
+ if entry.Mtime.IsZero() {
+ expireTime = entry.Crtime
+ } else {
+ expireTime = entry.Mtime
+ }
+ return expireTime.Add(time.Duration(entry.TtlSec) * time.Second)
+}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index b86ac3c5b..d3d2de948 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -351,37 +351,162 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ if entry.IsExpireS3Enabled() {
+ if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
+ glog.ErrorfCtx(ctx, "FindEntry doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
+ }
+ return nil, filer_pb.ErrNotFound
+ }
+ } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound
}
}
- return
+ return entry, err
}
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ // Collect expired entries during iteration to avoid deadlock with DB connection pool
+ var expiredEntries []*Entry
+ var s3ExpiredEntries []*Entry
+ var hasValidEntries bool
+
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
select {
case <-ctx.Done():
return false
default:
if entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteOneEntry(ctx, entry)
+ if entry.IsExpireS3Enabled() {
+ if entry.GetS3ExpireTime().Before(time.Now()) && !entry.IsS3Versioning() {
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ s3ExpiredEntries = append(s3ExpiredEntries, entry)
+ expiredCount++
+ return true
+ }
+ } else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ // Collect for deletion after iteration completes to avoid DB deadlock
+ expiredEntries = append(expiredEntries, entry)
expiredCount++
return true
}
}
+ // Track that we found at least one valid (non-expired) entry
+ hasValidEntries = true
return eachEntryFunc(entry)
}
})
if err != nil {
return expiredCount, lastFileName, err
}
+
+ // Delete expired entries after iteration completes to avoid DB connection deadlock
+ if len(s3ExpiredEntries) > 0 || len(expiredEntries) > 0 {
+ for _, entry := range s3ExpiredEntries {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, entry, true, false, nil); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries doDeleteEntryMetaAndData %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+ for _, entry := range expiredEntries {
+ if delErr := f.Store.DeleteOneEntry(ctx, entry); delErr != nil {
+ glog.ErrorfCtx(ctx, "doListDirectoryEntries DeleteOneEntry %s failed: %v", entry.FullPath, delErr)
+ }
+ }
+
+ // After expiring entries, the directory might be empty.
+ // Attempt to clean it up and any empty parent directories.
+ if !hasValidEntries && p != "/" && startFileName == "" {
+ stopAtPath := util.FullPath(f.DirBucketsPath)
+ f.DeleteEmptyParentDirectories(ctx, p, stopAtPath)
+ }
+ }
+
return
}
+// DeleteEmptyParentDirectories recursively checks and deletes parent directories if they become empty.
+// It stops at root "/" or at stopAtPath (if provided).
+// This is useful for cleaning up directories after deleting files or expired entries.
+//
+// IMPORTANT: For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
+// This prevents accidental deletion of directories outside the intended scope (e.g., outside bucket paths).
+//
+// Example usage:
+//
+// // After deleting /bucket/dir/subdir/file.txt, clean up empty parent directories
+// // but stop at the bucket path
+// parentPath := util.FullPath("/bucket/dir/subdir")
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, util.FullPath("/bucket"))
+//
+// Example with gRPC client:
+//
+// if err := pb_filer_client.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+// return filer_pb.Traverse(ctx, filer, parentPath, "", func(entry *filer_pb.Entry) error {
+// // Process entries...
+// })
+// }); err == nil {
+// filer.DeleteEmptyParentDirectories(ctx, parentPath, stopPath)
+// }
+func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.FullPath, stopAtPath util.FullPath) {
+ if dirPath == "/" || dirPath == stopAtPath {
+ return
+ }
+
+ // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything)
+ stopStr := string(stopAtPath)
+ if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(string(dirPath)+"/", stopStr+"/") {
+ glog.V(1).InfofCtx(ctx, "DeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
+ return
+ }
+
+ // Additional safety: prevent deletion of bucket-level directories
+ // This protects /buckets/mybucket from being deleted even if empty
+ baseDepth := strings.Count(f.DirBucketsPath, "/")
+ dirDepth := strings.Count(string(dirPath), "/")
+ if dirDepth <= baseDepth+1 {
+ glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: skipping deletion of bucket-level directory %s", dirPath)
+ return
+ }
+
+ // Check if directory is empty
+ isEmpty, err := f.IsDirectoryEmpty(ctx, dirPath)
+ if err != nil {
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
+ return
+ }
+
+ if !isEmpty {
+ // Directory is not empty, stop checking upward
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
+ return
+ }
+
+ // Directory is empty, try to delete it
+ glog.V(2).InfofCtx(ctx, "DeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
+ parentDir, _ := dirPath.DirAndName()
+ if dirEntry, findErr := f.FindEntry(ctx, dirPath); findErr == nil {
+ if delErr := f.doDeleteEntryMetaAndData(ctx, dirEntry, false, false, nil); delErr == nil {
+ // Successfully deleted, continue checking upwards
+ f.DeleteEmptyParentDirectories(ctx, util.FullPath(parentDir), stopAtPath)
+ } else {
+ // Failed to delete, stop cleanup
+ glog.V(3).InfofCtx(ctx, "DeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, delErr)
+ }
+ }
+}
+
+// IsDirectoryEmpty checks if a directory contains any entries
+func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
+ isEmpty := true
+ _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
+ isEmpty = false
+ return false // Stop after first entry
+ })
+ return isEmpty, err
+}
+
func (f *Filer) Shutdown() {
close(f.deletionQuit)
f.LocalMetaLogBuffer.ShutdownLogBuffer()
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 80adab292..17953c67d 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -308,3 +308,59 @@ func DoRemove(ctx context.Context, client SeaweedFilerClient, parentDirectoryPat
return nil
}
+
+// DoDeleteEmptyParentDirectories recursively deletes empty parent directories.
+// It stops at root "/" or at stopAtPath.
+// For safety, dirPath must be under stopAtPath (when stopAtPath is provided).
+// The checked map tracks already-processed directories to avoid redundant work in batch operations.
+func DoDeleteEmptyParentDirectories(ctx context.Context, client SeaweedFilerClient, dirPath util.FullPath, stopAtPath util.FullPath, checked map[string]bool) {
+ if dirPath == "/" || dirPath == stopAtPath {
+ return
+ }
+
+ // Skip if already checked (for batch delete optimization)
+ dirPathStr := string(dirPath)
+ if checked != nil {
+ if checked[dirPathStr] {
+ return
+ }
+ checked[dirPathStr] = true
+ }
+
+ // Safety check: if stopAtPath is provided, dirPath must be under it (root "/" allows everything)
+ stopStr := string(stopAtPath)
+ if stopAtPath != "" && stopStr != "/" && !strings.HasPrefix(dirPathStr+"/", stopStr+"/") {
+ glog.V(1).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: %s is not under %s, skipping", dirPath, stopAtPath)
+ return
+ }
+
+ // Check if directory is empty by listing with limit 1
+ isEmpty := true
+ err := SeaweedList(ctx, client, dirPathStr, "", func(entry *Entry, isLast bool) error {
+ isEmpty = false
+ return io.EOF // Use sentinel error to explicitly stop iteration
+ }, "", false, 1)
+
+ if err != nil && err != io.EOF {
+ glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: error checking %s: %v", dirPath, err)
+ return
+ }
+
+ if !isEmpty {
+ // Directory is not empty, stop checking upward
+ glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: directory %s is not empty, stopping cleanup", dirPath)
+ return
+ }
+
+ // Directory is empty, try to delete it
+ glog.V(2).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: deleting empty directory %s", dirPath)
+ parentDir, dirName := dirPath.DirAndName()
+
+ if err := DoRemove(ctx, client, parentDir, dirName, false, false, false, false, nil); err == nil {
+ // Successfully deleted, continue checking upwards
+ DoDeleteEmptyParentDirectories(ctx, client, util.FullPath(parentDir), stopAtPath, checked)
+ } else {
+ // Failed to delete, stop cleanup
+ glog.V(3).InfofCtx(ctx, "DoDeleteEmptyParentDirectories: failed to delete %s: %v", dirPath, err)
+ }
+}
diff --git a/weed/pb/filer_pb/filer_pb_helper.go b/weed/pb/filer_pb/filer_pb_helper.go
index b5fd4e1e0..c8dd19d59 100644
--- a/weed/pb/filer_pb/filer_pb_helper.go
+++ b/weed/pb/filer_pb/filer_pb_helper.go
@@ -9,6 +9,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/viant/ptrie"
"google.golang.org/protobuf/proto"
@@ -24,6 +25,31 @@ func (entry *Entry) IsDirectoryKeyObject() bool {
return entry.IsDirectory && entry.Attributes != nil && entry.Attributes.Mime != ""
}
+func (entry *Entry) GetExpiryTime() (expiryTime int64) {
+ // For S3 objects with lifecycle expiration, use Mtime (modification time)
+ // For regular TTL entries, use Crtime (creation time) for backward compatibility
+ if entry.Extended != nil {
+ if _, hasS3Expiry := entry.Extended[s3_constants.SeaweedFSExpiresS3]; hasS3Expiry {
+ // S3 lifecycle expiration: base TTL on modification time
+ expiryTime = entry.Attributes.Mtime
+ if expiryTime == 0 {
+ expiryTime = entry.Attributes.Crtime
+ }
+ expiryTime += int64(entry.Attributes.TtlSec)
+ return expiryTime
+ }
+ }
+
+ // Regular TTL expiration: base on creation time only
+ expiryTime = entry.Attributes.Crtime + int64(entry.Attributes.TtlSec)
+ return expiryTime
+}
+
+func (entry *Entry) IsExpired() bool {
+ return entry != nil && entry.Attributes != nil && entry.Attributes.TtlSec > 0 &&
+ time.Now().Unix() >= entry.GetExpiryTime()
+}
+
func (entry *Entry) FileMode() (fileMode os.FileMode) {
if entry != nil && entry.Attributes != nil {
fileMode = os.FileMode(entry.Attributes.FileMode)
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index cb4c73692..c4c07f0c7 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -55,8 +55,7 @@ func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateM
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
- entry.Extended["key"] = []byte(*input.Key)
-
+ entry.Extended[s3_constants.ExtMultipartObjectKey] = []byte(*input.Key)
// Set object owner for multipart upload
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
@@ -173,6 +172,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
deleteEntries := []*filer_pb.Entry{}
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
entityTooSmall := false
+ entityWithTtl := false
for _, entry := range entries {
foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
@@ -212,6 +212,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
foundEntry = true
}
if foundEntry {
+ if !entityWithTtl && entry.Attributes != nil && entry.Attributes.TtlSec > 0 {
+ entityWithTtl = true
+ }
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] &&
entry.Attributes.FileSize < multiPartMinSize {
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name)
@@ -330,7 +333,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
for k, v := range pentry.Extended {
- if k != "key" {
+ if k != s3_constants.ExtMultipartObjectKey {
versionEntry.Extended[k] = v
}
}
@@ -392,7 +395,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
for k, v := range pentry.Extended {
- if k != "key" {
+ if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
@@ -445,7 +448,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
for k, v := range pentry.Extended {
- if k != "key" {
+ if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
@@ -468,6 +471,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
+ // Set TTL-based S3 expiry (modification time)
+ if entityWithTtl {
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ }
})
if err != nil {
@@ -587,7 +594,7 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
uploadsCount := int64(0)
for _, entry := range entries {
if entry.Extended != nil {
- key := string(entry.Extended["key"])
+ key := string(entry.Extended[s3_constants.ExtMultipartObjectKey])
if *input.KeyMarker != "" && *input.KeyMarker != key {
continue
}
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 9dd9a684e..ef7396996 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -2,11 +2,14 @@ package s3api
import (
"context"
+ "errors"
"fmt"
"strings"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -108,6 +111,110 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
return err
}
+func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error {
+ // Use iterative approach with a queue to avoid recursive WithFilerClient calls
+ // which would create a new connection for each subdirectory
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ ctx := context.Background()
+ var updateErrors []error
+ dirsToProcess := []string{parentDirectoryPath}
+
+ for len(dirsToProcess) > 0 {
+ dir := dirsToProcess[0]
+ dirsToProcess = dirsToProcess[1:]
+
+ // Process directory in paginated batches
+ if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil {
+ updateErrors = append(updateErrors, err)
+ }
+ }
+
+ if len(updateErrors) > 0 {
+ return errors.Join(updateErrors...)
+ }
+ return nil
+ })
+}
+
+// processDirectoryTTL processes a single directory in paginated batches
+func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
+ dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error {
+
+ const batchSize = filer.PaginationSize
+ startFrom := ""
+
+ for {
+ lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors)
+ if err != nil {
+ return fmt.Errorf("list entries in %s: %w", dir, err)
+ }
+
+ // If we got fewer entries than batch size, we've reached the end
+ if entryCount < batchSize {
+ break
+ }
+ startFrom = lastEntryName
+ }
+ return nil
+}
+
+// processTTLBatch processes a single batch of entries
+func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient,
+ dir string, ttlSec int32, startFrom string, batchSize uint32,
+ dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) {
+
+ err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ lastEntry = entry.Name
+ count++
+
+ if entry.IsDirectory {
+ *dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name)))
+ return nil
+ }
+
+ // Update entry TTL and S3 expiry flag
+ if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil {
+ *updateErrors = append(*updateErrors, updateErr)
+ }
+ return nil
+ }, startFrom, false, batchSize)
+
+ return lastEntry, count, err
+}
+
+// updateEntryTTL updates a single entry's TTL and S3 expiry flag
+func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient,
+ dir string, entry *filer_pb.Entry, ttlSec int32) error {
+
+ if entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
+ }
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+
+ // Check if both TTL and S3 expiry flag are already set correctly
+ flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true"
+ if entry.Attributes.TtlSec == ttlSec && flagAlreadySet {
+ return nil // Already up to date
+ }
+
+ // Set the S3 expiry flag
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ // Update TTL if needed
+ if entry.Attributes.TtlSec != ttlSec {
+ entry.Attributes.TtlSec = ttlSec
+ }
+
+ if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ }); err != nil {
+ return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)
+ }
+ return nil
+}
+
func (s3a *S3ApiServer) getCollectionName(bucket string) string {
if s3a.option.FilerGroup != "" {
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)
diff --git a/weed/s3api/s3_constants/extend_key.go b/weed/s3api/s3_constants/extend_key.go
index f0f223a45..d57798341 100644
--- a/weed/s3api/s3_constants/extend_key.go
+++ b/weed/s3api/s3_constants/extend_key.go
@@ -11,6 +11,7 @@ const (
ExtETagKey = "Seaweed-X-Amz-ETag"
ExtLatestVersionIdKey = "Seaweed-X-Amz-Latest-Version-Id"
ExtLatestVersionFileNameKey = "Seaweed-X-Amz-Latest-Version-File-Name"
+ ExtMultipartObjectKey = "key"
// Bucket Policy
ExtBucketPolicyKey = "Seaweed-X-Amz-Bucket-Policy"
diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go
index 82a270111..77ed310d9 100644
--- a/weed/s3api/s3_constants/header.go
+++ b/weed/s3api/s3_constants/header.go
@@ -42,6 +42,7 @@ const (
SeaweedFSIsDirectoryKey = "X-Seaweedfs-Is-Directory-Key"
SeaweedFSPartNumber = "X-Seaweedfs-Part-Number"
SeaweedFSUploadId = "X-Seaweedfs-Upload-Id"
+ SeaweedFSExpiresS3 = "X-Seaweedfs-Expires-S3"
// S3 ACL headers
AmzCannedAcl = "X-Amz-Acl"
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index ead77041e..9509219d9 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -7,6 +7,7 @@ import (
"encoding/xml"
"errors"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"math"
"net/http"
"path"
@@ -792,9 +793,9 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
if rule.Expiration.Days == 0 {
continue
}
-
+ locationPrefix := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix)
locConf := &filer_pb.FilerConf_PathConf{
- LocationPrefix: fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, rulePrefix),
+ LocationPrefix: locationPrefix,
Collection: collectionName,
Ttl: fmt.Sprintf("%dd", rule.Expiration.Days),
}
@@ -806,6 +807,13 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
+ ttlSec := int32((time.Duration(rule.Expiration.Days) * util.LifeCycleInterval).Seconds())
+ glog.V(2).Infof("Start updating TTL for %s", locationPrefix)
+ if updErr := s3a.updateEntriesTTL(locationPrefix, ttlSec); updErr != nil {
+ glog.Errorf("PutBucketLifecycleConfigurationHandler update TTL for %s: %s", locationPrefix, updErr)
+ } else {
+ glog.V(2).Infof("Finished updating TTL for %s", locationPrefix)
+ }
changed = true
}
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 163633e22..8917393be 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -375,7 +375,6 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" {
r.Header.Set("Range", originalRangeHeader)
-
}
// Add SSE metadata headers based on object metadata before SSE processing
@@ -603,7 +602,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
resp.Body.Close()
return
}
-
setUserMetadataKeyToLowercase(resp)
responseStatusCode, bytesTransferred := responseFn(resp, w)
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index 3a2544710..f779a6edc 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -1,6 +1,7 @@
package s3api
import (
+ "context"
"encoding/xml"
"fmt"
"io"
@@ -8,14 +9,11 @@ import (
"slices"
"strings"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
-
"github.com/seaweedfs/seaweedfs/weed/filer"
-
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
-
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -129,22 +127,19 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Use operation context that won't be cancelled if request terminates
+ // This ensures deletion completes atomically to avoid inconsistent state
+ opCtx := context.WithoutCancel(r.Context())
if err := doDeleteEntry(client, dir, name, true, false); err != nil {
return err
}
- if s3a.option.AllowEmptyFolder {
- return nil
- }
-
- directoriesWithDeletion := make(map[string]int)
- if strings.LastIndex(object, "/") > 0 {
- directoriesWithDeletion[dir]++
- // purge empty folders, only checking folders with deletions
- for len(directoriesWithDeletion) > 0 {
- directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
- }
+ // Cleanup empty directories
+ if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
+ bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
+ // Recursively delete empty parent directories, stop at bucket path
+ filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
}
return nil
@@ -227,7 +222,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
- directoriesWithDeletion := make(map[string]int)
+ directoriesWithDeletion := make(map[string]bool)
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
@@ -250,6 +245,9 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Use operation context that won't be cancelled if request terminates
+ // This ensures batch deletion completes atomically to avoid inconsistent state
+ opCtx := context.WithoutCancel(r.Context())
// delete file entries
for _, object := range deleteObjects.Objects {
@@ -359,12 +357,14 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
- directoriesWithDeletion[parentDirectoryPath]++
+ // Track directory for empty directory cleanup
+ if !s3a.option.AllowEmptyFolder {
+ directoriesWithDeletion[parentDirectoryPath] = true
+ }
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
} else {
- delete(directoriesWithDeletion, parentDirectoryPath)
deleteErrors = append(deleteErrors, DeleteError{
Code: "",
Message: err.Error(),
@@ -380,13 +380,29 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
- if s3a.option.AllowEmptyFolder {
- return nil
- }
+ // Cleanup empty directories - optimize by processing deepest first
+ if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
+ bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
- // purge empty folders, only checking folders with deletions
- for len(directoriesWithDeletion) > 0 {
- directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
+ // Collect and sort directories by depth (deepest first) to avoid redundant checks
+ var allDirs []string
+ for dirPath := range directoriesWithDeletion {
+ allDirs = append(allDirs, dirPath)
+ }
+ // Sort by depth (deeper directories first)
+ slices.SortFunc(allDirs, func(a, b string) int {
+ return strings.Count(b, "/") - strings.Count(a, "/")
+ })
+
+ // Track already-checked directories to avoid redundant work
+ checked := make(map[string]bool)
+ for _, dirPath := range allDirs {
+ if !checked[dirPath] {
+ // Recursively delete empty parent directories, stop at bucket path
+ // Mark this directory and all its parents as checked during recursion
+ filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
+ }
+ }
}
return nil
@@ -403,26 +419,3 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
writeSuccessResponseXML(w, r, deleteResp)
}
-
-func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
- var allDirs []string
- for dir := range directoriesWithDeletion {
- allDirs = append(allDirs, dir)
- }
- slices.SortFunc(allDirs, func(a, b string) int {
- return len(b) - len(a)
- })
- newDirectoriesWithDeletion = make(map[string]int)
- for _, dir := range allDirs {
- parentDir, dirName := util.FullPath(dir).DirAndName()
- if parentDir == s3a.option.BucketsPath {
- continue
- }
- if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
- glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
- } else {
- newDirectoriesWithDeletion[parentDir]++
- }
- }
- return
-}
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 148df89f6..0d07c548e 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -333,7 +333,8 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
proxyReq.Header.Set(s3_constants.SeaweedFSSSES3Key, base64.StdEncoding.EncodeToString(sseS3Metadata))
glog.V(3).Infof("putToFiler: storing SSE-S3 metadata for object %s with keyID %s", uploadUrl, sseS3Key.KeyID)
}
-
+ // Set TTL-based S3 expiry (modification time)
+ proxyReq.Header.Set(s3_constants.SeaweedFSExpiresS3, "true")
// ensure that the Authorization header is overriding any previous
// Authorization header which might be already present in proxyReq
s3a.maybeAddFilerJwtAuthorization(proxyReq, true)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d2b3d8b52..fba693f43 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -136,8 +136,17 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
if err := fs.checkPermissions(ctx, r, fileName); err != nil {
return nil, nil, err
}
+ // Disable TTL-based (creation time) deletion when S3 expiry (modification time) is enabled
+ soMaybeWithOutTTL := so
+ if so.TtlSeconds > 0 {
+ if s3ExpiresValue := r.Header.Get(s3_constants.SeaweedFSExpiresS3); s3ExpiresValue == "true" {
+ clone := *so
+ clone.TtlSeconds = 0
+ soMaybeWithOutTTL = &clone
+ }
+ }
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadRequestToChunks(ctx, w, r, r.Body, chunkSize, fileName, contentType, contentLength, soMaybeWithOutTTL)
if err != nil {
return nil, nil, err
@@ -330,7 +339,9 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
entry.Extended = SaveAmzMetaData(r, entry.Extended, false)
-
+ if entry.TtlSec > 0 && r.Header.Get(s3_constants.SeaweedFSExpiresS3) == "true" {
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ }
for k, v := range r.Header {
if len(v) > 0 && len(v[0]) > 0 {
if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
diff --git a/weed/util/constants_lifecycle_interval_10sec.go b/weed/util/constants_lifecycle_interval_10sec.go
new file mode 100644
index 000000000..60f19c316
--- /dev/null
+++ b/weed/util/constants_lifecycle_interval_10sec.go
@@ -0,0 +1,8 @@
+//go:build s3tests
+// +build s3tests
+
+package util
+
+import "time"
+
+const LifeCycleInterval = 10 * time.Second
diff --git a/weed/util/constants_lifecycle_interval_day.go b/weed/util/constants_lifecycle_interval_day.go
new file mode 100644
index 000000000..e2465ad5f
--- /dev/null
+++ b/weed/util/constants_lifecycle_interval_day.go
@@ -0,0 +1,8 @@
+//go:build !s3tests
+// +build !s3tests
+
+package util
+
+import "time"
+
+const LifeCycleInterval = 24 * time.Hour