aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-07-28 02:10:32 -0700
committerChris Lu <chris.lu@gmail.com>2018-07-28 02:10:32 -0700
commit01bcc89803b5caefe6d1809d4a85bc8a1d19918e (patch)
tree27d2b7f382a4a584ab4315903c2a2978c596cc77 /weed/filer2
parent1ab8232b55e4572a4b456f02854640f74acbceef (diff)
downloadseaweedfs-01bcc89803b5caefe6d1809d4a85bc8a1d19918e.tar.xz
seaweedfs-01bcc89803b5caefe6d1809d4a85bc8a1d19918e.zip
refactor into MasterClient
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/filer.go16
-rw-r--r--weed/filer2/filer_master.go66
2 files changed, 12 insertions, 70 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 2deb8ffd5..f5c2849fe 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -11,20 +11,20 @@ import (
"path/filepath"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "context"
)
type Filer struct {
- masters []string
store FilerStore
directoryCache *ccache.Cache
-
- currentMaster string
+ masterClient *wdclient.MasterClient
}
func NewFiler(masters []string) *Filer {
return &Filer{
- masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ masterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
}
}
@@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() {
f.directoryCache = nil
}
+func (fs *Filer) GetMaster() string {
+ return fs.masterClient.GetMaster()
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+ fs.masterClient.KeepConnectedToMaster()
+}
+
func (f *Filer) CreateEntry(entry *Entry) error {
dirParts := strings.Split(string(entry.FullPath), "/")
diff --git a/weed/filer2/filer_master.go b/weed/filer2/filer_master.go
deleted file mode 100644
index bbac17940..000000000
--- a/weed/filer2/filer_master.go
+++ /dev/null
@@ -1,66 +0,0 @@
-package filer2
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "time"
-)
-
-func (fs *Filer) GetMaster() string {
- return fs.currentMaster
-}
-
-func (fs *Filer) KeepConnectedToMaster() {
- glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
- for {
- fs.tryAllMasters()
- time.Sleep(time.Second)
- }
-}
-
-func (fs *Filer) tryAllMasters() {
- for _, master := range fs.masters {
- glog.V(0).Infof("Connecting to %v", master)
- withMasterClient(master, func(client master_pb.SeaweedClient) error {
- stream, err := client.KeepConnected(context.Background())
- if err != nil {
- glog.V(0).Infof("failed to keep connected to %s: %v", master, err)
- return err
- }
-
- glog.V(0).Infof("Connected to %v", master)
- fs.currentMaster = master
-
- if err = stream.Send(&master_pb.ClientListenRequest{Name: "filer"}); err != nil {
- glog.V(0).Infof("failed to send to %s: %v", master, err)
- return err
- }
-
- for {
- if volumeLocation, err := stream.Recv(); err != nil {
- glog.V(0).Infof("failed to receive from %s: %v", master, err)
- return err
- } else {
- glog.V(0).Infof("volume location: %+v", volumeLocation)
- }
- }
- })
- fs.currentMaster = ""
- }
-}
-
-func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
-
- grpcConnection, err := util.GrpcDial(master)
- if err != nil {
- return fmt.Errorf("fail to dial %s: %v", master, err)
- }
- defer grpcConnection.Close()
-
- client := master_pb.NewSeaweedClient(grpcConnection)
-
- return fn(client)
-}