diff options
| author | chrislu <chris.lu@gmail.com> | 2024-06-28 14:57:20 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-06-28 14:57:20 -0700 |
| commit | c030cb3ce98701e266fa54031b70be805af6835b (patch) | |
| tree | 4dbe3735593edf183c2bf5dbba4561f3dfb878ac /weed/server | |
| parent | 00f87e5bb5b95f1568bcba4876d33e11e8eef1b1 (diff) | |
| download | seaweedfs-c030cb3ce98701e266fa54031b70be805af6835b.tar.xz seaweedfs-c030cb3ce98701e266fa54031b70be805af6835b.zip | |
bootstrap filer from one peer
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_traverse_meta.go | 84 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_traverse_meta_test.go | 31 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 2 |
3 files changed, 116 insertions, 1 deletions
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go new file mode 100644 index 000000000..4a924f065 --- /dev/null +++ b/weed/server/filer_grpc_server_traverse_meta.go @@ -0,0 +1,84 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/viant/ptrie" +) + +func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error { + + glog.V(0).Infof("TraverseBfsMetadata %v", req) + + excludedTrie := ptrie.New[bool]() + for _, excluded := range req.ExcludedPrefixes { + excludedTrie.Put([]byte(excluded), true) + } + + ctx := stream.Context() + + queue := util.NewQueue[*filer.Entry]() + dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory)) + if err != nil { + return fmt.Errorf("find dir %s: %v", req.Directory, err) + } + queue.Enqueue(dirEntry) + + for item := queue.Dequeue(); item != nil; item = queue.Dequeue() { + if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool { + return true + }) { + // println("excluded", item.FullPath) + continue + } + parent, _ := item.FullPath.DirAndName() + if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{ + Directory: parent, + Entry: item.ToProtoEntry(), + }); err != nil { + return fmt.Errorf("send traverse bfs metadata response: %v", err) + } + + if !item.IsDirectory() { + continue + } + + if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error { + queue.Enqueue(entry) + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) { + var lastFileName string + var listErr error + for { + var hasEntries bool + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { + hasEntries = true + if fnErr := fn(entry); fnErr != nil { + err = fnErr + return false + } + return true + }) + if listErr != nil { + return listErr + } + if err != nil { + return err + } + if !hasEntries { + return nil + } + } +} diff --git a/weed/server/filer_grpc_server_traverse_meta_test.go b/weed/server/filer_grpc_server_traverse_meta_test.go new file mode 100644 index 000000000..72f8a916e --- /dev/null +++ b/weed/server/filer_grpc_server_traverse_meta_test.go @@ -0,0 +1,31 @@ +package weed_server + +import ( + "github.com/stretchr/testify/assert" + "github.com/viant/ptrie" + "testing" +) + +func TestPtrie(t *testing.T) { + b := []byte("/topics/abc/dev") + excludedTrie := ptrie.New[bool]() + excludedTrie.Put([]byte("/topics/abc/d"), true) + excludedTrie.Put([]byte("/topics/abc"), true) + + assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool { + println("matched1", string(key)) + return true + })) + + assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool { + println("matched2", string(key)) + return true + })) + + assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool { + println("matched3", string(key)) + return true + })) + + assert.False(t, excludedTrie.Has(b)) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 0b7254c0d..7bc1b361c 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -203,7 +203,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) } if isFresh { glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes) - if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil { + if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil { glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err) } } |
