aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-06-28 14:57:20 -0700
committerchrislu <chris.lu@gmail.com>2024-06-28 14:57:20 -0700
commitc030cb3ce98701e266fa54031b70be805af6835b (patch)
tree4dbe3735593edf183c2bf5dbba4561f3dfb878ac /weed/server
parent00f87e5bb5b95f1568bcba4876d33e11e8eef1b1 (diff)
downloadseaweedfs-c030cb3ce98701e266fa54031b70be805af6835b.tar.xz
seaweedfs-c030cb3ce98701e266fa54031b70be805af6835b.zip
bootstrap filer from one peer
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_traverse_meta.go84
-rw-r--r--weed/server/filer_grpc_server_traverse_meta_test.go31
-rw-r--r--weed/server/filer_server.go2
3 files changed, 116 insertions, 1 deletions
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go
new file mode 100644
index 000000000..4a924f065
--- /dev/null
+++ b/weed/server/filer_grpc_server_traverse_meta.go
@@ -0,0 +1,84 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/viant/ptrie"
+)
+
+func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error {
+
+ glog.V(0).Infof("TraverseBfsMetadata %v", req)
+
+ excludedTrie := ptrie.New[bool]()
+ for _, excluded := range req.ExcludedPrefixes {
+ excludedTrie.Put([]byte(excluded), true)
+ }
+
+ ctx := stream.Context()
+
+ queue := util.NewQueue[*filer.Entry]()
+ dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory))
+ if err != nil {
+ return fmt.Errorf("find dir %s: %v", req.Directory, err)
+ }
+ queue.Enqueue(dirEntry)
+
+ for item := queue.Dequeue(); item != nil; item = queue.Dequeue() {
+ if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool {
+ return true
+ }) {
+ // println("excluded", item.FullPath)
+ continue
+ }
+ parent, _ := item.FullPath.DirAndName()
+ if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{
+ Directory: parent,
+ Entry: item.ToProtoEntry(),
+ }); err != nil {
+ return fmt.Errorf("send traverse bfs metadata response: %v", err)
+ }
+
+ if !item.IsDirectory() {
+ continue
+ }
+
+ if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error {
+ queue.Enqueue(entry)
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) {
+ var lastFileName string
+ var listErr error
+ for {
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if fnErr := fn(entry); fnErr != nil {
+ err = fnErr
+ return false
+ }
+ return true
+ })
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
+ }
+ }
+}
diff --git a/weed/server/filer_grpc_server_traverse_meta_test.go b/weed/server/filer_grpc_server_traverse_meta_test.go
new file mode 100644
index 000000000..72f8a916e
--- /dev/null
+++ b/weed/server/filer_grpc_server_traverse_meta_test.go
@@ -0,0 +1,31 @@
+package weed_server
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/viant/ptrie"
+ "testing"
+)
+
+func TestPtrie(t *testing.T) {
+ b := []byte("/topics/abc/dev")
+ excludedTrie := ptrie.New[bool]()
+ excludedTrie.Put([]byte("/topics/abc/d"), true)
+ excludedTrie.Put([]byte("/topics/abc"), true)
+
+ assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool {
+ println("matched1", string(key))
+ return true
+ }))
+
+ assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool {
+ println("matched2", string(key))
+ return true
+ }))
+
+ assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool {
+ println("matched3", string(key))
+ return true
+ }))
+
+ assert.False(t, excludedTrie.Has(b))
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 0b7254c0d..7bc1b361c 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -203,7 +203,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
}
if isFresh {
glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
- if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
+ if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
}
}