aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/filer.go14
-rw-r--r--weed/filer2/filer_master.go60
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go2
-rw-r--r--weed/filer2/memdb/memdb_store_test.go4
4 files changed, 70 insertions, 10 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 0b4113c38..e886b7d74 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -13,14 +13,16 @@ import (
)
type Filer struct {
- master string
+ masters []string
store FilerStore
directoryCache *ccache.Cache
+
+ currentMaster string
}
-func NewFiler(master string) *Filer {
+func NewFiler(masters []string) *Filer {
return &Filer{
- master: master,
+ masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
}
}
@@ -175,14 +177,12 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
}
func (f *Filer) deleteChunks(entry *Entry) {
- if f.master == "" {
- return
- }
+
if entry == nil {
return
}
for _, chunk := range entry.Chunks {
- if err := operation.DeleteFile(f.master, chunk.FileId, ""); err != nil {
+ if err := operation.DeleteFile(f.GetMaster(), chunk.FileId, ""); err != nil {
glog.V(0).Infof("deleting file %s: %v", chunk.FileId, err)
}
}
diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go
new file mode 100644
index 000000000..f69f68a85
--- /dev/null
+++ b/weed/filer2/filer_master.go
@@ -0,0 +1,60 @@
+package filer2
+
+import (
+ "fmt"
+ "context"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "google.golang.org/grpc"
+)
+
+func (fs *Filer) GetMaster() string {
+ return fs.currentMaster
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+ glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
+ for _, master := range fs.masters {
+ glog.V(0).Infof("Connecting to %v", master)
+ withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ stream, err := client.KeepConnected(context.Background())
+ if err != nil {
+ glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
+ return err
+ }
+
+ glog.V(0).Infof("Connected to %v", master)
+ fs.currentMaster = master
+
+ for {
+ time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
+
+ if err = stream.Send(&master_pb.Empty{}); err != nil {
+ glog.V(0).Infof("failed to send to %s: %v", master, err)
+ return err
+ }
+
+ if _, err = stream.Recv(); err != nil {
+ glog.V(0).Infof("failed to receive from %s: %v", master, err)
+ return err
+ }
+ }
+ })
+ fs.currentMaster = ""
+ }
+}
+
+func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("fail to dial %s: %v", master, err)
+ }
+ defer grpcConnection.Close()
+
+ client := master_pb.NewSeaweedClient(grpcConnection)
+
+ return fn(client)
+}
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 896dabdc3..ad72a2e60 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -8,7 +8,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler("")
+ filer := filer2.NewFiler(nil)
dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
defer os.RemoveAll(dir)
store := &LevelDBStore{}
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
index 5265ed248..160b4a16d 100644
--- a/weed/filer2/memdb/memdb_store_test.go
+++ b/weed/filer2/memdb/memdb_store_test.go
@@ -6,7 +6,7 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler("")
+ filer := filer2.NewFiler(nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)
@@ -43,7 +43,7 @@ func TestCreateAndFind(t *testing.T) {
}
func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler("")
+ filer := filer2.NewFiler(nil)
store := &MemDbStore{}
store.Initialize(nil)
filer.SetStore(store)