aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server.go22
-rw-r--r--weed/server/master_grpc_server_cluster.go20
-rw-r--r--weed/server/master_server.go3
-rw-r--r--weed/server/master_server_cluster.go70
4 files changed, 99 insertions, 16 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index c669adaa6..53e28c5bf 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -6,7 +6,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
"net"
- "strings"
"time"
"github.com/chrislusf/raft"
@@ -195,9 +194,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
// buffer by 1 so we don't end up getting stuck writing to stopChan forever
stopChan := make(chan bool, 1)
- clientName, messageChan := ms.addClient(req.Name, peerAddress)
+ clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
+ ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version)
- defer ms.deleteClient(clientName)
+ defer func() {
+ ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress)
+ ms.deleteClient(clientName)
+ }()
for _, message := range ms.Topo.ToVolumeLocations() {
if sendErr := stream.Send(message); sendErr != nil {
@@ -295,19 +298,6 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string {
}
-func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
- resp := &master_pb.ListMasterClientsResponse{}
- ms.clientChansLock.RLock()
- defer ms.clientChansLock.RUnlock()
-
- for k := range ms.clientChans {
- if strings.HasPrefix(k, req.ClientType+"@") {
- resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
- }
- }
- return resp, nil
-}
-
func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
// tell the volume servers about the leader
diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go
new file mode 100644
index 000000000..68801a3ba
--- /dev/null
+++ b/weed/server/master_grpc_server_cluster.go
@@ -0,0 +1,20 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
+ resp := &master_pb.ListClusterNodesResponse{}
+
+ clusterNodes := ms.Cluster.ListClusterNode(req.ClientType)
+
+ for _, node := range clusterNodes {
+ resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
+ Address: string(node.address),
+ Version: node.version,
+ })
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3b3b1c94b..653c4e003 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -67,6 +67,8 @@ type MasterServer struct {
MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
+
+ Cluster *Cluster
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
@@ -103,6 +105,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(),
+ Cluster: NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
diff --git a/weed/server/master_server_cluster.go b/weed/server/master_server_cluster.go
new file mode 100644
index 000000000..52e1526f9
--- /dev/null
+++ b/weed/server/master_server_cluster.go
@@ -0,0 +1,70 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "sync"
+)
+
+type NodeType int
+
+const (
+ filerNodeType NodeType = iota
+)
+
+type ClusterNode struct {
+ address pb.ServerAddress
+ version string
+}
+
+type Cluster struct {
+ filers map[pb.ServerAddress]*ClusterNode
+ filersLock sync.RWMutex
+}
+
+func NewCluster() *Cluster {
+ return &Cluster{
+ filers: make(map[pb.ServerAddress]*ClusterNode),
+ }
+}
+
+func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
+ switch nodeType {
+ case "filer":
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
+ if _, found := cluster.filers[address]; found {
+ return
+ }
+ cluster.filers[address] = &ClusterNode{
+ address: address,
+ version: version,
+ }
+ case "master":
+ }
+}
+
+func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
+ switch nodeType {
+ case "filer":
+ cluster.filersLock.Lock()
+ defer cluster.filersLock.Unlock()
+ if _, found := cluster.filers[address]; !found {
+ return
+ }
+ delete(cluster.filers, address)
+ case "master":
+ }
+}
+
+func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode){
+ switch nodeType {
+ case "filer":
+ cluster.filersLock.RLock()
+ defer cluster.filersLock.RUnlock()
+ for _, node := range cluster.filers {
+ nodes = append(nodes, node)
+ }
+ case "master":
+ }
+ return
+}