aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_traverse_meta.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server_traverse_meta.go')
-rw-r--r--weed/server/filer_grpc_server_traverse_meta.go84
1 files changed, 84 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go
new file mode 100644
index 000000000..4a924f065
--- /dev/null
+++ b/weed/server/filer_grpc_server_traverse_meta.go
@@ -0,0 +1,84 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/viant/ptrie"
+)
+
+func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error {
+
+ glog.V(0).Infof("TraverseBfsMetadata %v", req)
+
+ excludedTrie := ptrie.New[bool]()
+ for _, excluded := range req.ExcludedPrefixes {
+ excludedTrie.Put([]byte(excluded), true)
+ }
+
+ ctx := stream.Context()
+
+ queue := util.NewQueue[*filer.Entry]()
+ dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory))
+ if err != nil {
+ return fmt.Errorf("find dir %s: %v", req.Directory, err)
+ }
+ queue.Enqueue(dirEntry)
+
+ for item := queue.Dequeue(); item != nil; item = queue.Dequeue() {
+ if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool {
+ return true
+ }) {
+ // println("excluded", item.FullPath)
+ continue
+ }
+ parent, _ := item.FullPath.DirAndName()
+ if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{
+ Directory: parent,
+ Entry: item.ToProtoEntry(),
+ }); err != nil {
+ return fmt.Errorf("send traverse bfs metadata response: %v", err)
+ }
+
+ if !item.IsDirectory() {
+ continue
+ }
+
+ if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error {
+ queue.Enqueue(entry)
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) {
+ var lastFileName string
+ var listErr error
+ for {
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if fnErr := fn(entry); fnErr != nil {
+ err = fnErr
+ return false
+ }
+ return true
+ })
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
+ }
+ }
+}