diff options
| author | chrislu <chris.lu@gmail.com> | 2024-06-28 15:02:46 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-06-28 15:02:46 -0700 |
| commit | d8dfadb617c5ab0b782f6bf746d48abf5800705d (patch) | |
| tree | 15aae1c4556814390ac24d4cb35e7307c345c585 | |
| parent | 1c0718f26d748d908c0d4ee1cc5dce071c1af74b (diff) | |
| download | seaweedfs-d8dfadb617c5ab0b782f6bf746d48abf5800705d.tar.xz seaweedfs-d8dfadb617c5ab0b782f6bf746d48abf5800705d.zip | |
ParallelProcessDirectoryStructure
need to use this for simpler parallel processing
| -rw-r--r-- | weed/filer/meta_replay.go | 38 |
1 files changed, 38 insertions, 0 deletions
diff --git a/weed/filer/meta_replay.go b/weed/filer/meta_replay.go index 51c4e6987..0432e17de 100644 --- a/weed/filer/meta_replay.go +++ b/weed/filer/meta_replay.go @@ -2,6 +2,7 @@ package filer import ( "context" + "sync" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,3 +36,40 @@ func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) err return nil } + + +// ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first. +// This also assumes the parent directories are in the entryChan already. +func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry)(error)) (firstErr error) { + + executors := util.NewLimitedConcurrentExecutor(concurrency) + + var wg sync.WaitGroup + for entry := range entryChan { + wg.Add(1) + if entry.IsDirectory() { + func() { + defer wg.Done() + if err := eachEntryFn(entry); err != nil { + if firstErr == nil { + firstErr = err + } + } + }() + } else { + executors.Execute(func() { + defer wg.Done() + if err := eachEntryFn(entry); err != nil { + if firstErr == nil { + firstErr = err + } + } + }) + } + if firstErr != nil { + break + } + } + wg.Wait() + return +} |
