aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage/traverse_bfs.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/remote_storage/traverse_bfs.go')
-rw-r--r--weed/remote_storage/traverse_bfs.go62
1 files changed, 62 insertions, 0 deletions
diff --git a/weed/remote_storage/traverse_bfs.go b/weed/remote_storage/traverse_bfs.go
new file mode 100644
index 000000000..4056f8715
--- /dev/null
+++ b/weed/remote_storage/traverse_bfs.go
@@ -0,0 +1,62 @@
+package remote_storage
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "sync"
+ "time"
+)
+
+type ListDirectoryFunc func(parentDir util.FullPath, visitFn VisitFunc) error
+
+func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc) (err error) {
+ K := 5
+
+ var dirQueueWg sync.WaitGroup
+ dirQueue := util.NewQueue()
+ dirQueueWg.Add(1)
+ dirQueue.Enqueue(parentPath)
+ var isTerminating bool
+
+ for i := 0; i < K; i++ {
+ go func() {
+ for {
+ if isTerminating {
+ break
+ }
+ t := dirQueue.Dequeue()
+ if t == nil {
+ time.Sleep(329 * time.Millisecond)
+ continue
+ }
+ dir := t.(util.FullPath)
+ processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg)
+ if processErr != nil {
+ err = processErr
+ }
+ dirQueueWg.Done()
+ }
+ }()
+ }
+
+ dirQueueWg.Wait()
+ isTerminating = true
+ return
+
+}
+
+func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) {
+
+ return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
+ if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil {
+ return err
+ }
+ if !isDirectory {
+ return nil
+ }
+ dirQueueWg.Add(1)
+ dirQueue.Enqueue(parentPath.Child(name))
+ return nil
+ })
+
+}