diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-11-02 23:38:45 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-11-02 23:38:45 -0700 |
| commit | 5160eb08f7665409221ebb0b9db6f4820e29bed3 (patch) | |
| tree | f63464dd23ee0742bd96afe52826b477a775dc5e /weed/server | |
| parent | 18bfbf62fcc64be380293ce797ab23f785c01760 (diff) | |
| download | seaweedfs-5160eb08f7665409221ebb0b9db6f4820e29bed3.tar.xz seaweedfs-5160eb08f7665409221ebb0b9db6f4820e29bed3.zip | |
shell: optionally read filer address from master
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server.go | 22 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_cluster.go | 20 | ||||
| -rw-r--r-- | weed/server/master_server.go | 3 | ||||
| -rw-r--r-- | weed/server/master_server_cluster.go | 70 |
4 files changed, 99 insertions, 16 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index c669adaa6..53e28c5bf 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -6,7 +6,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" "net" - "strings" "time" "github.com/chrislusf/raft" @@ -195,9 +194,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ // buffer by 1 so we don't end up getting stuck writing to stopChan forever stopChan := make(chan bool, 1) - clientName, messageChan := ms.addClient(req.Name, peerAddress) + clientName, messageChan := ms.addClient(req.ClientType, peerAddress) + ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) - defer ms.deleteClient(clientName) + defer func() { + ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) + ms.deleteClient(clientName) + }() for _, message := range ms.Topo.ToVolumeLocations() { if sendErr := stream.Send(message); sendErr != nil { @@ -295,19 +298,6 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string { } -func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) { - resp := &master_pb.ListMasterClientsResponse{} - ms.clientChansLock.RLock() - defer ms.clientChansLock.RUnlock() - - for k := range ms.clientChans { - if strings.HasPrefix(k, req.ClientType+"@") { - resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:]) - } - } - return resp, nil -} - func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { // tell the volume servers about the leader diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go new file mode 100644 index 000000000..68801a3ba --- /dev/null +++ b/weed/server/master_grpc_server_cluster.go @@ -0,0 +1,20 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) { + resp := &master_pb.ListClusterNodesResponse{} + + clusterNodes := ms.Cluster.ListClusterNode(req.ClientType) + + for _, node := range clusterNodes { + resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ + Address: string(node.address), + Version: node.version, + }) + } + return resp, nil +} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3b3b1c94b..653c4e003 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -67,6 +67,8 @@ type MasterServer struct { MasterClient *wdclient.MasterClient adminLocks *AdminLocks + + Cluster *Cluster } func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { @@ -103,6 +105,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), adminLocks: NewAdminLocks(), + Cluster: NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) diff --git a/weed/server/master_server_cluster.go b/weed/server/master_server_cluster.go new file mode 100644 index 000000000..52e1526f9 --- /dev/null +++ b/weed/server/master_server_cluster.go @@ -0,0 +1,70 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "sync" +) + +type NodeType int + +const ( + filerNodeType NodeType = iota +) + +type ClusterNode struct { + address pb.ServerAddress + version string +} + +type Cluster struct { + filers map[pb.ServerAddress]*ClusterNode + filersLock sync.RWMutex +} + +func NewCluster() *Cluster { + return &Cluster{ + filers: make(map[pb.ServerAddress]*ClusterNode), + } +} + +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { + switch nodeType { + case "filer": + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + if _, found := cluster.filers[address]; found { + return + } + cluster.filers[address] = &ClusterNode{ + address: address, + version: version, + } + case "master": + } +} + +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { + switch nodeType { + case "filer": + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + if _, found := cluster.filers[address]; !found { + return + } + delete(cluster.filers, address) + case "master": + } +} + +func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode){ + switch nodeType { + case "filer": + cluster.filersLock.RLock() + defer cluster.filersLock.RUnlock() + for _, node := range cluster.filers { + nodes = append(nodes, node) + } + case "master": + } + return +} |
