aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-07-27 01:54:45 -0700
committerChris Lu <chris.lu@gmail.com>2018-07-27 01:54:45 -0700
commit0d989491993a98753bf34569f74791c87cecb900 (patch)
treebd64cdbe7ca1156792059d9fc7647e254481ba38
parente4b7e31902d6fe181f3257fd943d5fa75a7d4d2d (diff)
downloadseaweedfs-0d989491993a98753bf34569f74791c87cecb900.tar.xz
seaweedfs-0d989491993a98753bf34569f74791c87cecb900.zip
tmp commit
-rw-r--r--weed/pb/seaweed.proto8
-rw-r--r--weed/util/http_util.go1
-rw-r--r--weed/wdclient/topolisenter/client_grpc_to_master.go85
-rw-r--r--weed/wdclient/topolisenter/cluster_listener.go56
-rw-r--r--weed/wdclient/wdclient.go42
5 files changed, 192 insertions, 0 deletions
diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto
index 8f0fed72c..6ed580b00 100644
--- a/weed/pb/seaweed.proto
+++ b/weed/pb/seaweed.proto
@@ -9,6 +9,8 @@ service Seaweed {
}
rpc KeepConnected (stream Empty) returns (stream Empty) {
}
+ rpc ListenForTopoChange (stream Empty) returns (stream VolumeLocation) {
+ }
}
//////////////////////////////////////////////////
@@ -55,3 +57,9 @@ message SuperBlockExtra {
}
ErasureCoding erasure_coding = 1;
}
+
+message VolumeLocation {
+ string url = 1;
+ string public_url = 2;
+ repeated uint32 vid = 3;
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 51bedcdfd..7b28d3d91 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -83,6 +83,7 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
+ defer r.Body.Close()
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go
new file mode 100644
index 000000000..863f79a1d
--- /dev/null
+++ b/weed/wdclient/topolisenter/client_grpc_to_master.go
@@ -0,0 +1,85 @@
+package clusterlistener
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "google.golang.org/grpc"
+
+ "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+ "github.com/golang/glog"
+)
+
+func (clusterListener *ClusterListener) establishConnectionWithMaster(
+ master string, msgChan chan *pb.ClusterStatusMessage) error {
+ grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
+ if err != nil {
+ return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err)
+ }
+ defer func() { _ = grpcConnection.Close() }()
+
+ masterClient := pb.NewAlpineMasterClient(grpcConnection)
+
+ stream, err := masterClient.RegisterClient(context.Background())
+ if err != nil {
+ return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err)
+ }
+
+ // TODO possible goroutine leaks if retry happens
+ go func() {
+ for keyspace := range clusterListener.clusters {
+ // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter)
+ if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil {
+ // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err)
+ return
+ }
+ }
+
+ for {
+ msg := <-clusterListener.keyspaceFollowMessageChan
+ if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil {
+ if msg.isUnfollow {
+ glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
+ } else {
+ glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
+ }
+ return
+ }
+ }
+
+ }()
+
+ // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource)
+
+ // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master)
+
+ for {
+ msg, err := stream.Recv()
+ if err == io.EOF {
+ // read done.
+ return nil
+ }
+ if err != nil {
+ return fmt.Errorf("client receive topology : %v", err)
+ }
+ msgChan <- msg
+ // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg)
+ }
+
+}
+
+func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error {
+ clientHeartbeat := &pb.ClientHeartbeat{
+ ClientName: clientName,
+ ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{
+ Keyspace: keyspace,
+ IsUnfollow: isUnfollow,
+ },
+ }
+
+ if err := stream.Send(clientHeartbeat); err != nil {
+ return fmt.Errorf("%s client send heartbeat: %v", clientName, err)
+ }
+ return nil
+}
diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go
new file mode 100644
index 000000000..91ca6fb6f
--- /dev/null
+++ b/weed/wdclient/topolisenter/cluster_listener.go
@@ -0,0 +1,56 @@
+package clusterlistener
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+ "code.uber.internal/fraud/alpine/server/util"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+type Location struct {
+ Url string
+ PublicUrl string
+}
+
+type ClusterListener struct {
+ sync.RWMutex
+ vid2locations map[storage.VolumeId][]*Location
+ clientName string
+}
+
+func NewClusterListener(clientName string) *ClusterListener {
+ return &ClusterListener{
+ vid2locations: make(map[storage.VolumeId][]*Location),
+ clientName: clientName,
+ }
+}
+
+// StartListener keeps the listener connected to the master.
+func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) {
+
+ clusterUpdatesChan := make(chan *pb.ClusterStatusMessage)
+
+ go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error {
+ return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan)
+ }, 2*time.Second)
+
+ go func() {
+ for {
+ select {
+ case msg := <-clusterUpdatesChan:
+ clusterListener.processClusterStatusMessage(msg)
+ }
+ }
+ }()
+
+ // println("client is connected to master", master, "data center", dataCenter)
+
+ return
+
+}
+
+func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) {
+}
diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go
new file mode 100644
index 000000000..cbe03359f
--- /dev/null
+++ b/weed/wdclient/wdclient.go
@@ -0,0 +1,42 @@
+package wdclient
+
+import (
+ "context"
+ "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
+)
+
+type SeaweedClient struct {
+ ctx context.Context
+ Master string
+ ClientName string
+ ClusterListener *clusterlistener.ClusterListener
+}
+
+// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes
+func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient {
+ c := &SeaweedClient{
+ ctx: ctx,
+ Master: master,
+ ClusterListener: clusterlistener.NewClusterListener(clientName),
+ ClientName: clientName,
+ }
+ c.ClusterListener.StartListener(ctx, c.Master)
+
+ conn, err := grpc.Dial(c.Master, grpc.WithInsecure())
+ if err != nil {
+ glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err)
+ }
+ c.MasterClient = pb.NewAlpineMasterClient(conn)
+
+ return c
+}
+
+// NewClusterClient create a lightweight client to access a specific cluster
+// TODO The call will block if the keyspace is not created in this data center.
+func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) {
+
+ return &ClusterClient{
+ keyspace: keyspace,
+ }
+
+}