diff options
Diffstat (limited to 'weed/wdclient')
| -rw-r--r-- | weed/wdclient/masterclient.go | 82 | ||||
| -rw-r--r-- | weed/wdclient/topolisenter/client_grpc_to_master.go | 85 | ||||
| -rw-r--r-- | weed/wdclient/topolisenter/cluster_listener.go | 56 | ||||
| -rw-r--r-- | weed/wdclient/wdclient.go | 37 |
4 files changed, 89 insertions, 171 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go new file mode 100644 index 000000000..fb634d0f0 --- /dev/null +++ b/weed/wdclient/masterclient.go @@ -0,0 +1,82 @@ +package wdclient + +import ( + "context" + "time" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type MasterClient struct { + ctx context.Context + name string + currentMaster string + masters []string +} + +func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient { + return &MasterClient{ + ctx: ctx, + name: clientName, + masters: masters, + } +} + +func (mc *MasterClient) GetMaster() string { + return mc.currentMaster +} + +func (mc *MasterClient) KeepConnectedToMaster() { + glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters) + for { + mc.tryAllMasters() + time.Sleep(time.Second) + } +} + +func (mc *MasterClient) tryAllMasters() { + for _, master := range mc.masters { + glog.V(0).Infof("Connecting to %v", master) + withMasterClient(master, func(client master_pb.SeaweedClient) error { + stream, err := client.KeepConnected(context.Background()) + if err != nil { + glog.V(0).Infof("failed to keep connected to %s: %v", master, err) + return err + } + + glog.V(0).Infof("Connected to %v", master) + mc.currentMaster = master + + if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { + glog.V(0).Infof("failed to send to %s: %v", master, err) + return err + } + + for { + if volumeLocation, err := stream.Recv(); err != nil { + glog.V(0).Infof("failed to receive from %s: %v", master, err) + return err + } else { + glog.V(0).Infof("volume location: %+v", volumeLocation) + } + } + }) + mc.currentMaster = "" + } +} + +func withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + grpcConnection, err := util.GrpcDial(master) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", master, err) + } + defer grpcConnection.Close() + + client := master_pb.NewSeaweedClient(grpcConnection) + + return fn(client) +} diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go deleted file mode 100644 index 863f79a1d..000000000 --- a/weed/wdclient/topolisenter/client_grpc_to_master.go +++ /dev/null @@ -1,85 +0,0 @@ -package clusterlistener - -import ( - "context" - "fmt" - "io" - - "google.golang.org/grpc" - - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" - "github.com/golang/glog" -) - -func (clusterListener *ClusterListener) establishConnectionWithMaster( - master string, msgChan chan *pb.ClusterStatusMessage) error { - grpcConnection, err := grpc.Dial(master, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err) - } - defer func() { _ = grpcConnection.Close() }() - - masterClient := pb.NewAlpineMasterClient(grpcConnection) - - stream, err := masterClient.RegisterClient(context.Background()) - if err != nil { - return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err) - } - - // TODO possible goroutine leaks if retry happens - go func() { - for keyspace := range clusterListener.clusters { - // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter) - if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil { - // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err) - return - } - } - - for { - msg := <-clusterListener.keyspaceFollowMessageChan - if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil { - if msg.isUnfollow { - glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) - } else { - glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) - } - return - } - } - - }() - - // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource) - - // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master) - - for { - msg, err := stream.Recv() - if err == io.EOF { - // read done. - return nil - } - if err != nil { - return fmt.Errorf("client receive topology : %v", err) - } - msgChan <- msg - // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg) - } - -} - -func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error { - clientHeartbeat := &pb.ClientHeartbeat{ - ClientName: clientName, - ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{ - Keyspace: keyspace, - IsUnfollow: isUnfollow, - }, - } - - if err := stream.Send(clientHeartbeat); err != nil { - return fmt.Errorf("%s client send heartbeat: %v", clientName, err) - } - return nil -} diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go deleted file mode 100644 index 91ca6fb6f..000000000 --- a/weed/wdclient/topolisenter/cluster_listener.go +++ /dev/null @@ -1,56 +0,0 @@ -package clusterlistener - -import ( - "context" - "sync" - "time" - - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" - "code.uber.internal/fraud/alpine/server/util" - "github.com/chrislusf/seaweedfs/weed/storage" -) - -type Location struct { - Url string - PublicUrl string -} - -type ClusterListener struct { - sync.RWMutex - vid2locations map[storage.VolumeId][]*Location - clientName string -} - -func NewClusterListener(clientName string) *ClusterListener { - return &ClusterListener{ - vid2locations: make(map[storage.VolumeId][]*Location), - clientName: clientName, - } -} - -// StartListener keeps the listener connected to the master. -func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) { - - clusterUpdatesChan := make(chan *pb.ClusterStatusMessage) - - go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error { - return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan) - }, 2*time.Second) - - go func() { - for { - select { - case msg := <-clusterUpdatesChan: - clusterListener.processClusterStatusMessage(msg) - } - } - }() - - // println("client is connected to master", master, "data center", dataCenter) - - return - -} - -func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) { -} diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go index cbe03359f..b16e239fb 100644 --- a/weed/wdclient/wdclient.go +++ b/weed/wdclient/wdclient.go @@ -2,41 +2,18 @@ package wdclient import ( "context" - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" ) type SeaweedClient struct { - ctx context.Context - Master string - ClientName string - ClusterListener *clusterlistener.ClusterListener + ctx context.Context + Master string + ClientName string } -// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes -func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient { - c := &SeaweedClient{ - ctx: ctx, - Master: master, - ClusterListener: clusterlistener.NewClusterListener(clientName), - ClientName: clientName, - } - c.ClusterListener.StartListener(ctx, c.Master) +func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient { + return &SeaweedClient{ + ctx: ctx, + ClientName: clientName, - conn, err := grpc.Dial(c.Master, grpc.WithInsecure()) - if err != nil { - glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err) } - c.MasterClient = pb.NewAlpineMasterClient(conn) - - return c -} - -// NewClusterClient create a lightweight client to access a specific cluster -// TODO The call will block if the keyspace is not created in this data center. -func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) { - - return &ClusterClient{ - keyspace: keyspace, - } - } |
