diff options
| author | chrislu <chris.lu@gmail.com> | 2025-11-05 13:37:40 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-11-05 13:37:40 -0800 |
| commit | 629b520edf2c6ab152f62bee9c50df156071fd01 (patch) | |
| tree | e46b062d1d1171879501f4509d45e6df95e6a456 | |
| parent | 0cad84ee3609b449cbd330bc249bafd61366a6a7 (diff) | |
| download | seaweedfs-629b520edf2c6ab152f62bee9c50df156071fd01.tar.xz seaweedfs-629b520edf2c6ab152f62bee9c50df156071fd01.zip | |
use iterative approach with a queue to avoid recursive WithFilerClient calls
| -rw-r--r-- | weed/s3api/filer_util.go | 62 |
1 files changed, 35 insertions, 27 deletions
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index a456ed611..6bf745dbf 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -112,43 +112,51 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_ } func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error { - var updateErrors []error - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Use iterative approach with a queue to avoid recursive WithFilerClient calls + // which would create a new connection for each subdirectory + return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + var updateErrors []error + dirsToProcess := []string{parentDirectoryPath} ctx := context.Background() - if listErr := filer_pb.SeaweedList(ctx, client, parentDirectoryPath, "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory { - if err := s3a.updateEntriesTTL(fmt.Sprintf("%s/%s", strings.TrimRight(parentDirectoryPath, "/"), entry.Name), ttlSec); err != nil { - updateErrors = append(updateErrors, fmt.Errorf("dir %s: %w", entry.Name, err)) + + for len(dirsToProcess) > 0 { + dir := dirsToProcess[0] + dirsToProcess = dirsToProcess[1:] + + if listErr := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory { + // Add subdirectory to queue for processing + dirsToProcess = append(dirsToProcess, fmt.Sprintf("%s/%s", strings.TrimRight(dir, "/"), entry.Name)) + return nil + } + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + if entry.Attributes.TtlSec == ttlSec { + return nil + } + entry.Attributes.TtlSec = ttlSec + if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }); err != nil { + updateErrors = append(updateErrors, fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)) } return nil + }, "", false, math.MaxInt32); listErr != nil { + updateErrors = append(updateErrors, fmt.Errorf("list entries in %s: %w", dir, listErr)) } - if entry.Attributes == nil { - entry.Attributes = &filer_pb.FuseAttributes{} - } - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") - if entry.Attributes.TtlSec == ttlSec { - return nil - } - entry.Attributes.TtlSec = ttlSec - if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ - Directory: parentDirectoryPath, - Entry: entry, - }); err != nil { - updateErrors = append(updateErrors, fmt.Errorf("file %s: %w", entry.Name, err)) - } - return nil - }, "", false, math.MaxInt32); listErr != nil { - return fmt.Errorf("list entries in %s: %w", parentDirectoryPath, listErr) } + if len(updateErrors) > 0 { return fmt.Errorf("failed to update %d entries: %v", len(updateErrors), updateErrors[0]) } return nil }) - return err } func (s3a *S3ApiServer) getCollectionName(bucket string) string { |
