aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/s3api/filer_util.go62
1 files changed, 35 insertions, 27 deletions
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index a456ed611..6bf745dbf 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -112,43 +112,51 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
}
func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error {
- var updateErrors []error
- err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // Use iterative approach with a queue to avoid recursive WithFilerClient calls
+ // which would create a new connection for each subdirectory
+ return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ var updateErrors []error
+ dirsToProcess := []string{parentDirectoryPath}
ctx := context.Background()
- if listErr := filer_pb.SeaweedList(ctx, client, parentDirectoryPath, "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory {
- if err := s3a.updateEntriesTTL(fmt.Sprintf("%s/%s", strings.TrimRight(parentDirectoryPath, "/"), entry.Name), ttlSec); err != nil {
- updateErrors = append(updateErrors, fmt.Errorf("dir %s: %w", entry.Name, err))
+
+ for len(dirsToProcess) > 0 {
+ dir := dirsToProcess[0]
+ dirsToProcess = dirsToProcess[1:]
+
+ if listErr := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.IsDirectory {
+ // Add subdirectory to queue for processing
+ dirsToProcess = append(dirsToProcess, fmt.Sprintf("%s/%s", strings.TrimRight(dir, "/"), entry.Name))
+ return nil
+ }
+ if entry.Attributes == nil {
+ entry.Attributes = &filer_pb.FuseAttributes{}
+ }
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
+ if entry.Attributes.TtlSec == ttlSec {
+ return nil
+ }
+ entry.Attributes.TtlSec = ttlSec
+ if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ }); err != nil {
+ updateErrors = append(updateErrors, fmt.Errorf("file %s/%s: %w", dir, entry.Name, err))
}
return nil
+ }, "", false, math.MaxInt32); listErr != nil {
+ updateErrors = append(updateErrors, fmt.Errorf("list entries in %s: %w", dir, listErr))
}
- if entry.Attributes == nil {
- entry.Attributes = &filer_pb.FuseAttributes{}
- }
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
- if entry.Attributes.TtlSec == ttlSec {
- return nil
- }
- entry.Attributes.TtlSec = ttlSec
- if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
- Directory: parentDirectoryPath,
- Entry: entry,
- }); err != nil {
- updateErrors = append(updateErrors, fmt.Errorf("file %s: %w", entry.Name, err))
- }
- return nil
- }, "", false, math.MaxInt32); listErr != nil {
- return fmt.Errorf("list entries in %s: %w", parentDirectoryPath, listErr)
}
+
if len(updateErrors) > 0 {
return fmt.Errorf("failed to update %d entries: %v", len(updateErrors), updateErrors[0])
}
return nil
})
- return err
}
func (s3a *S3ApiServer) getCollectionName(bucket string) string {