diff options
Diffstat (limited to 'weed/s3api/filer_util.go')
| -rw-r--r-- | weed/s3api/filer_util.go | 107 |
1 files changed, 107 insertions, 0 deletions
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 9dd9a684e..ef7396996 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -2,11 +2,14 @@ package s3api import ( "context" + "errors" "fmt" "strings" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/util" ) @@ -108,6 +111,110 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_ return err } +func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error { + // Use iterative approach with a queue to avoid recursive WithFilerClient calls + // which would create a new connection for each subdirectory + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() + var updateErrors []error + dirsToProcess := []string{parentDirectoryPath} + + for len(dirsToProcess) > 0 { + dir := dirsToProcess[0] + dirsToProcess = dirsToProcess[1:] + + // Process directory in paginated batches + if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil { + updateErrors = append(updateErrors, err) + } + } + + if len(updateErrors) > 0 { + return errors.Join(updateErrors...) + } + return nil + }) +} + +// processDirectoryTTL processes a single directory in paginated batches +func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { + + const batchSize = filer.PaginationSize + startFrom := "" + + for { + lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors) + if err != nil { + return fmt.Errorf("list entries in %s: %w", dir, err) + } + + // If we got fewer entries than batch size, we've reached the end + if entryCount < batchSize { + break + } + startFrom = lastEntryName + } + return nil +} + +// processTTLBatch processes a single batch of entries +func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, startFrom string, batchSize uint32, + dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) { + + err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + lastEntry = entry.Name + count++ + + if entry.IsDirectory { + *dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name))) + return nil + } + + // Update entry TTL and S3 expiry flag + if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil { + *updateErrors = append(*updateErrors, updateErr) + } + return nil + }, startFrom, false, batchSize) + + return lastEntry, count, err +} + +// updateEntryTTL updates a single entry's TTL and S3 expiry flag +func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, entry *filer_pb.Entry, ttlSec int32) error { + + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + // Check if both TTL and S3 expiry flag are already set correctly + flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" + if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { + return nil // Already up to date + } + + // Set the S3 expiry flag + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + // Update TTL if needed + if entry.Attributes.TtlSec != ttlSec { + entry.Attributes.TtlSec = ttlSec + } + + if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }); err != nil { + return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err) + } + return nil +} + func (s3a *S3ApiServer) getCollectionName(bucket string) string { if s3a.option.FilerGroup != "" { return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) |
