aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-06-28 15:02:46 -0700
committerchrislu <chris.lu@gmail.com>2024-06-28 15:02:46 -0700
commitd8dfadb617c5ab0b782f6bf746d48abf5800705d (patch)
tree15aae1c4556814390ac24d4cb35e7307c345c585
parent1c0718f26d748d908c0d4ee1cc5dce071c1af74b (diff)
downloadseaweedfs-d8dfadb617c5ab0b782f6bf746d48abf5800705d.tar.xz
seaweedfs-d8dfadb617c5ab0b782f6bf746d48abf5800705d.zip
ParallelProcessDirectoryStructure
need to use this for simpler parallel processing
-rw-r--r--weed/filer/meta_replay.go38
1 files changed, 38 insertions, 0 deletions
diff --git a/weed/filer/meta_replay.go b/weed/filer/meta_replay.go
index 51c4e6987..0432e17de 100644
--- a/weed/filer/meta_replay.go
+++ b/weed/filer/meta_replay.go
@@ -2,6 +2,7 @@ package filer
import (
"context"
+ "sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -35,3 +36,40 @@ func Replay(filerStore FilerStore, resp *filer_pb.SubscribeMetadataResponse) err
return nil
}
+
+
+// ParallelProcessDirectoryStructure processes each entry in parallel, and also ensure parent directories are processed first.
+// This also assumes the parent directories are in the entryChan already.
+func ParallelProcessDirectoryStructure(entryChan chan *Entry, concurrency int, eachEntryFn func(entry *Entry)(error)) (firstErr error) {
+
+ executors := util.NewLimitedConcurrentExecutor(concurrency)
+
+ var wg sync.WaitGroup
+ for entry := range entryChan {
+ wg.Add(1)
+ if entry.IsDirectory() {
+ func() {
+ defer wg.Done()
+ if err := eachEntryFn(entry); err != nil {
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ }()
+ } else {
+ executors.Execute(func() {
+ defer wg.Done()
+ if err := eachEntryFn(entry); err != nil {
+ if firstErr == nil {
+ firstErr = err
+ }
+ }
+ })
+ }
+ if firstErr != nil {
+ break
+ }
+ }
+ wg.Wait()
+ return
+}