aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-07-28 02:10:32 -0700
committerChris Lu <chris.lu@gmail.com>2018-07-28 02:10:32 -0700
commit01bcc89803b5caefe6d1809d4a85bc8a1d19918e (patch)
tree27d2b7f382a4a584ab4315903c2a2978c596cc77
parent1ab8232b55e4572a4b456f02854640f74acbceef (diff)
downloadseaweedfs-01bcc89803b5caefe6d1809d4a85bc8a1d19918e.tar.xz
seaweedfs-01bcc89803b5caefe6d1809d4a85bc8a1d19918e.zip
refactor into MasterClient
-rw-r--r--weed/filer2/filer.go16
-rw-r--r--weed/wdclient/masterclient.go (renamed from weed/filer2/filer_master.go)42
-rw-r--r--weed/wdclient/topolisenter/client_grpc_to_master.go85
-rw-r--r--weed/wdclient/topolisenter/cluster_listener.go56
-rw-r--r--weed/wdclient/wdclient.go37
5 files changed, 48 insertions, 188 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 2deb8ffd5..f5c2849fe 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -11,20 +11,20 @@ import (
"path/filepath"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "context"
)
type Filer struct {
- masters []string
store FilerStore
directoryCache *ccache.Cache
-
- currentMaster string
+ masterClient *wdclient.MasterClient
}
func NewFiler(masters []string) *Filer {
return &Filer{
- masters: masters,
directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ masterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
}
}
@@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() {
f.directoryCache = nil
}
+func (fs *Filer) GetMaster() string {
+ return fs.masterClient.GetMaster()
+}
+
+func (fs *Filer) KeepConnectedToMaster() {
+ fs.masterClient.KeepConnectedToMaster()
+}
+
func (f *Filer) CreateEntry(entry *Entry) error {
dirParts := strings.Split(string(entry.FullPath), "/")
diff --git a/weed/filer2/filer_master.go b/weed/wdclient/masterclient.go
index bbac17940..fb634d0f0 100644
--- a/weed/filer2/filer_master.go
+++ b/weed/wdclient/masterclient.go
@@ -1,28 +1,44 @@
-package filer2
+package wdclient
import (
"context"
+ "time"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "time"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
-func (fs *Filer) GetMaster() string {
- return fs.currentMaster
+type MasterClient struct {
+ ctx context.Context
+ name string
+ currentMaster string
+ masters []string
+}
+
+func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient {
+ return &MasterClient{
+ ctx: ctx,
+ name: clientName,
+ masters: masters,
+ }
+}
+
+func (mc *MasterClient) GetMaster() string {
+ return mc.currentMaster
}
-func (fs *Filer) KeepConnectedToMaster() {
- glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters)
+func (mc *MasterClient) KeepConnectedToMaster() {
+ glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters)
for {
- fs.tryAllMasters()
+ mc.tryAllMasters()
time.Sleep(time.Second)
}
}
-func (fs *Filer) tryAllMasters() {
- for _, master := range fs.masters {
+func (mc *MasterClient) tryAllMasters() {
+ for _, master := range mc.masters {
glog.V(0).Infof("Connecting to %v", master)
withMasterClient(master, func(client master_pb.SeaweedClient) error {
stream, err := client.KeepConnected(context.Background())
@@ -32,9 +48,9 @@ func (fs *Filer) tryAllMasters() {
}
glog.V(0).Infof("Connected to %v", master)
- fs.currentMaster = master
+ mc.currentMaster = master
- if err = stream.Send(&master_pb.ClientListenRequest{Name: "filer"}); err != nil {
+ if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil {
glog.V(0).Infof("failed to send to %s: %v", master, err)
return err
}
@@ -48,7 +64,7 @@ func (fs *Filer) tryAllMasters() {
}
}
})
- fs.currentMaster = ""
+ mc.currentMaster = ""
}
}
diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go
deleted file mode 100644
index 863f79a1d..000000000
--- a/weed/wdclient/topolisenter/client_grpc_to_master.go
+++ /dev/null
@@ -1,85 +0,0 @@
-package clusterlistener
-
-import (
- "context"
- "fmt"
- "io"
-
- "google.golang.org/grpc"
-
- "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
- "github.com/golang/glog"
-)
-
-func (clusterListener *ClusterListener) establishConnectionWithMaster(
- master string, msgChan chan *pb.ClusterStatusMessage) error {
- grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
- if err != nil {
- return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err)
- }
- defer func() { _ = grpcConnection.Close() }()
-
- masterClient := pb.NewAlpineMasterClient(grpcConnection)
-
- stream, err := masterClient.RegisterClient(context.Background())
- if err != nil {
- return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err)
- }
-
- // TODO possible goroutine leaks if retry happens
- go func() {
- for keyspace := range clusterListener.clusters {
- // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter)
- if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil {
- // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err)
- return
- }
- }
-
- for {
- msg := <-clusterListener.keyspaceFollowMessageChan
- if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil {
- if msg.isUnfollow {
- glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
- } else {
- glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
- }
- return
- }
- }
-
- }()
-
- // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource)
-
- // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master)
-
- for {
- msg, err := stream.Recv()
- if err == io.EOF {
- // read done.
- return nil
- }
- if err != nil {
- return fmt.Errorf("client receive topology : %v", err)
- }
- msgChan <- msg
- // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg)
- }
-
-}
-
-func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error {
- clientHeartbeat := &pb.ClientHeartbeat{
- ClientName: clientName,
- ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{
- Keyspace: keyspace,
- IsUnfollow: isUnfollow,
- },
- }
-
- if err := stream.Send(clientHeartbeat); err != nil {
- return fmt.Errorf("%s client send heartbeat: %v", clientName, err)
- }
- return nil
-}
diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go
deleted file mode 100644
index 91ca6fb6f..000000000
--- a/weed/wdclient/topolisenter/cluster_listener.go
+++ /dev/null
@@ -1,56 +0,0 @@
-package clusterlistener
-
-import (
- "context"
- "sync"
- "time"
-
- "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
- "code.uber.internal/fraud/alpine/server/util"
- "github.com/chrislusf/seaweedfs/weed/storage"
-)
-
-type Location struct {
- Url string
- PublicUrl string
-}
-
-type ClusterListener struct {
- sync.RWMutex
- vid2locations map[storage.VolumeId][]*Location
- clientName string
-}
-
-func NewClusterListener(clientName string) *ClusterListener {
- return &ClusterListener{
- vid2locations: make(map[storage.VolumeId][]*Location),
- clientName: clientName,
- }
-}
-
-// StartListener keeps the listener connected to the master.
-func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) {
-
- clusterUpdatesChan := make(chan *pb.ClusterStatusMessage)
-
- go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error {
- return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan)
- }, 2*time.Second)
-
- go func() {
- for {
- select {
- case msg := <-clusterUpdatesChan:
- clusterListener.processClusterStatusMessage(msg)
- }
- }
- }()
-
- // println("client is connected to master", master, "data center", dataCenter)
-
- return
-
-}
-
-func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) {
-}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
index cbe03359f..b16e239fb 100644
--- a/weed/wdclient/wdclient.go
+++ b/weed/wdclient/wdclient.go
@@ -2,41 +2,18 @@ package wdclient
import (
"context"
- "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
)
type SeaweedClient struct {
- ctx context.Context
- Master string
- ClientName string
- ClusterListener *clusterlistener.ClusterListener
+ ctx context.Context
+ Master string
+ ClientName string
}
-// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes
-func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient {
- c := &SeaweedClient{
- ctx: ctx,
- Master: master,
- ClusterListener: clusterlistener.NewClusterListener(clientName),
- ClientName: clientName,
- }
- c.ClusterListener.StartListener(ctx, c.Master)
+func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient {
+ return &SeaweedClient{
+ ctx: ctx,
+ ClientName: clientName,
- conn, err := grpc.Dial(c.Master, grpc.WithInsecure())
- if err != nil {
- glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err)
}
- c.MasterClient = pb.NewAlpineMasterClient(conn)
-
- return c
-}
-
-// NewClusterClient create a lightweight client to access a specific cluster
-// TODO The call will block if the keyspace is not created in this data center.
-func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) {
-
- return &ClusterClient{
- keyspace: keyspace,
- }
-
}