diff options
Diffstat (limited to 'weed/server/filer_grpc_server_traverse_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_traverse_meta.go | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go new file mode 100644 index 000000000..4a924f065 --- /dev/null +++ b/weed/server/filer_grpc_server_traverse_meta.go @@ -0,0 +1,84 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/viant/ptrie" +) + +func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error { + + glog.V(0).Infof("TraverseBfsMetadata %v", req) + + excludedTrie := ptrie.New[bool]() + for _, excluded := range req.ExcludedPrefixes { + excludedTrie.Put([]byte(excluded), true) + } + + ctx := stream.Context() + + queue := util.NewQueue[*filer.Entry]() + dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory)) + if err != nil { + return fmt.Errorf("find dir %s: %v", req.Directory, err) + } + queue.Enqueue(dirEntry) + + for item := queue.Dequeue(); item != nil; item = queue.Dequeue() { + if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool { + return true + }) { + // println("excluded", item.FullPath) + continue + } + parent, _ := item.FullPath.DirAndName() + if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{ + Directory: parent, + Entry: item.ToProtoEntry(), + }); err != nil { + return fmt.Errorf("send traverse bfs metadata response: %v", err) + } + + if !item.IsDirectory() { + continue + } + + if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error { + queue.Enqueue(entry) + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) { + var lastFileName string + var listErr error + for { + var hasEntries bool + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { + hasEntries = true + if fnErr := fn(entry); fnErr != nil { + err = fnErr + return false + } + return true + }) + if listErr != nil { + return listErr + } + if err != nil { + return err + } + if !hasEntries { + return nil + } + } +} |
