diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/filer.go | 14 | ||||
| -rw-r--r-- | weed/filer2/filer_master.go | 60 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store_test.go | 2 | ||||
| -rw-r--r-- | weed/filer2/memdb/memdb_store_test.go | 4 |
4 files changed, 70 insertions, 10 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 0b4113c38..e886b7d74 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -13,14 +13,16 @@ import ( ) type Filer struct { - master string + masters []string store FilerStore directoryCache *ccache.Cache + + currentMaster string } -func NewFiler(master string) *Filer { +func NewFiler(masters []string) *Filer { return &Filer{ - master: master, + masters: masters, directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), } } @@ -175,14 +177,12 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) { } func (f *Filer) deleteChunks(entry *Entry) { - if f.master == "" { - return - } + if entry == nil { return } for _, chunk := range entry.Chunks { - if err := operation.DeleteFile(f.master, chunk.FileId, ""); err != nil { + if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil { glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err) } } diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go new file mode 100644 index 000000000..f69f68a85 --- /dev/null +++ b/weed/filer2/filer_master.go @@ -0,0 +1,60 @@ +package filer2 + +import ( + "fmt" + "context" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/glog" + "google.golang.org/grpc" +) + +func (fs *Filer) GetMaster() string { + return fs.currentMaster +} + +func (fs *Filer) KeepConnectedToMaster() { + glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters) + for _, master := range fs.masters { + glog.V(0).Infof("Connecting to %v", master) + withMasterClient(master, func(client master_pb.SeaweedClient) error { + stream, err := client.KeepConnected(context.Background()) + if err != nil { + glog.V(0).Infof("failed to keep connected to %s: %v", master, err) + return err + } + + glog.V(0).Infof("Connected to %v", master) + fs.currentMaster = master + + for { + time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond) + + if err = stream.Send(&master_pb.Empty{}); err != nil { + glog.V(0).Infof("failed to send to %s: %v", master, err) + return err + } + + if _, err = stream.Recv(); err != nil { + glog.V(0).Infof("failed to receive from %s: %v", master, err) + return err + } + } + }) + fs.currentMaster = "" + } +} + +func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + grpcConnection, err := grpc.Dial(master, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", master, err) + } + defer grpcConnection.Close() + + client := master_pb.NewSeaweedClient(grpcConnection) + + return fn(client) +} diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 896dabdc3..ad72a2e60 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -8,7 +8,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler("") + filer := filer2.NewFiler(nil) dir, _ := ioutil.TempDir("", "seaweedfs_filer_test") defer os.RemoveAll(dir) store := &LevelDBStore{} diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go index 5265ed248..160b4a16d 100644 --- a/weed/filer2/memdb/memdb_store_test.go +++ b/weed/filer2/memdb/memdb_store_test.go @@ -6,7 +6,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - filer := filer2.NewFiler("") + filer := filer2.NewFiler(nil) store := &MemDbStore{} store.Initialize(nil) filer.SetStore(store) @@ -43,7 +43,7 @@ func TestCreateAndFind(t *testing.T) { } func TestCreateFileAndList(t *testing.T) { - filer := filer2.NewFiler("") + filer := filer2.NewFiler(nil) store := &MemDbStore{} store.Initialize(nil) filer.SetStore(store) |
