aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-11-06 04:07:38 -0700
committerChris Lu <chris.lu@gmail.com>2021-11-06 11:29:50 -0700
commit330d1fde7f6a9634c1242a0490fab30cbdb12c6c (patch)
tree8b6eecb343174cc42fb3f44ab393937166359777
parent12e6692dac5a4b8152fe08c66d57eb3b0e1a4f11 (diff)
downloadseaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.tar.xz
seaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.zip
send peers info to filers
-rw-r--r--weed/election/cluster.go64
-rw-r--r--weed/server/master_grpc_server.go35
-rw-r--r--weed/server/master_server.go4
-rw-r--r--weed/wdclient/masterclient.go9
4 files changed, 86 insertions, 26 deletions
diff --git a/weed/election/cluster.go b/weed/election/cluster.go
index 7c7c1089b..7f247f2cf 100644
--- a/weed/election/cluster.go
+++ b/weed/election/cluster.go
@@ -2,6 +2,7 @@ package election
import (
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math"
"sync"
"time"
@@ -31,14 +32,14 @@ func NewCluster() *Cluster {
}
}
-func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
+func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
switch nodeType {
case "filer":
cluster.nodesLock.Lock()
defer cluster.nodesLock.Unlock()
if existingNode, found := cluster.nodes[address]; found {
existingNode.counter++
- return
+ return nil
}
cluster.nodes[address] = &ClusterNode{
Address: address,
@@ -46,27 +47,29 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
counter: 1,
createdTs: time.Now(),
}
- cluster.ensureLeader(true, address)
+ return cluster.ensureLeader(true, nodeType, address)
case "master":
}
+ return nil
}
-func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
+func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
switch nodeType {
case "filer":
cluster.nodesLock.Lock()
defer cluster.nodesLock.Unlock()
if existingNode, found := cluster.nodes[address]; !found {
- return
+ return nil
} else {
existingNode.counter--
if existingNode.counter <= 0 {
delete(cluster.nodes, address)
- cluster.ensureLeader(false, address)
+ return cluster.ensureLeader(false, nodeType, address)
}
}
case "master":
}
+ return nil
}
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
@@ -82,13 +85,40 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode)
return
}
-func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
+func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
if isAdd {
if cluster.leaders.addLeaderIfVacant(address) {
// has added the address as one leader
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsLeader: true,
+ IsAdd: true,
+ },
+ })
+ } else {
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsLeader: false,
+ IsAdd: true,
+ },
+ })
}
} else {
if cluster.leaders.removeLeaderIfExists(address) {
+
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsLeader: true,
+ IsAdd: false,
+ },
+ })
+
// pick the freshest one, since it is less likely to go away
var shortestDuration int64 = math.MaxInt64
now := time.Now()
@@ -105,10 +135,28 @@ func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
}
if candidateAddress != "" {
cluster.leaders.addLeaderIfVacant(candidateAddress)
+ // added a new leader
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(candidateAddress),
+ IsLeader: true,
+ IsAdd: true,
+ },
+ })
}
- // removed the leader, and maybe added a new leader
+ } else {
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsLeader: false,
+ IsAdd: false,
+ },
+ })
}
}
+ return
}
func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 1e4bbd8e4..7411bbc99 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -44,11 +44,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(message.DeletedVids) > 0 {
- ms.clientChansLock.RLock()
- for _, ch := range ms.clientChans {
- ch <- message
- }
- ms.clientChansLock.RUnlock()
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
}
}()
@@ -153,12 +149,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
- ms.clientChansLock.RLock()
- for host, ch := range ms.clientChans {
- glog.V(0).Infof("master send to %s: %s", host, message.String())
- ch <- message
- }
- ms.clientChansLock.RUnlock()
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
// tell the volume servers about the leader
@@ -195,10 +186,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
stopChan := make(chan bool, 1)
clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
- ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version)
+ for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) {
+ ms.broadcastToClients(update)
+ }
defer func() {
- ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress)
+ for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) {
+ ms.broadcastToClients(update)
+ }
ms.deleteClient(clientName)
}()
@@ -223,7 +218,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
for {
select {
case message := <-messageChan:
- if err := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); err != nil {
+ if err := stream.Send(message); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, message)
return err
}
@@ -238,6 +233,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
+func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) {
+ ms.clientChansLock.RLock()
+ for _, ch := range ms.clientChans {
+ ch <- message
+ }
+ ms.clientChansLock.RUnlock()
+}
+
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
leader, err := ms.Topo.Leader()
if err != nil {
@@ -254,7 +257,7 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
return nil
}
-func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
clientName = clientType + "@" + string(clientAddress)
glog.V(0).Infof("+ client %v", clientName)
@@ -263,7 +266,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddr
// trying to send to it in SendHeartbeat and so we can't lock the
// clientChansLock to remove the channel and we're stuck writing to it
// 100 is probably overkill
- messageChan = make(chan *master_pb.VolumeLocation, 100)
+ messageChan = make(chan *master_pb.KeepConnectedResponse, 100)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 26ac91e8f..39812f641 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -61,7 +61,7 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
- clientChans map[string]chan *master_pb.VolumeLocation
+ clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption
@@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
option: option,
preallocateSize: preallocateSize,
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(),
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 6d8e2d06e..a2f6c7ffb 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -149,6 +149,15 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
}
+ if resp.ClusterNodeUpdate != nil {
+ update := resp.ClusterNodeUpdate
+ if update.IsAdd {
+ glog.V(0).Infof("+ %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader)
+ } else {
+ glog.V(0).Infof("- %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader)
+ }
+ }
+
}
})