diff options
Diffstat (limited to 'weed/remote_storage/traverse_bfs.go')
| -rw-r--r-- | weed/remote_storage/traverse_bfs.go | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/weed/remote_storage/traverse_bfs.go b/weed/remote_storage/traverse_bfs.go new file mode 100644 index 000000000..4056f8715 --- /dev/null +++ b/weed/remote_storage/traverse_bfs.go @@ -0,0 +1,62 @@ +package remote_storage + +import ( + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "sync" + "time" +) + +type ListDirectoryFunc func(parentDir util.FullPath, visitFn VisitFunc) error + +func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc) (err error) { + K := 5 + + var dirQueueWg sync.WaitGroup + dirQueue := util.NewQueue() + dirQueueWg.Add(1) + dirQueue.Enqueue(parentPath) + var isTerminating bool + + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := dirQueue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue + } + dir := t.(util.FullPath) + processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg) + if processErr != nil { + err = processErr + } + dirQueueWg.Done() + } + }() + } + + dirQueueWg.Wait() + isTerminating = true + return + +} + +func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) { + + return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error { + if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil { + return err + } + if !isDirectory { + return nil + } + dirQueueWg.Add(1) + dirQueue.Enqueue(parentPath.Child(name)) + return nil + }) + +} |
