diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-11-06 04:07:38 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-11-06 11:29:50 -0700 |
| commit | 330d1fde7f6a9634c1242a0490fab30cbdb12c6c (patch) | |
| tree | 8b6eecb343174cc42fb3f44ab393937166359777 /weed/election/cluster.go | |
| parent | 12e6692dac5a4b8152fe08c66d57eb3b0e1a4f11 (diff) | |
| download | seaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.tar.xz seaweedfs-330d1fde7f6a9634c1242a0490fab30cbdb12c6c.zip | |
send peers info to filers
Diffstat (limited to 'weed/election/cluster.go')
| -rw-r--r-- | weed/election/cluster.go | 64 |
1 files changed, 56 insertions, 8 deletions
diff --git a/weed/election/cluster.go b/weed/election/cluster.go index 7c7c1089b..7f247f2cf 100644 --- a/weed/election/cluster.go +++ b/weed/election/cluster.go @@ -2,6 +2,7 @@ package election import ( "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math" "sync" "time" @@ -31,14 +32,14 @@ func NewCluster() *Cluster { } } -func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; found { existingNode.counter++ - return + return nil } cluster.nodes[address] = &ClusterNode{ Address: address, @@ -46,27 +47,29 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress counter: 1, createdTs: time.Now(), } - cluster.ensureLeader(true, address) + return cluster.ensureLeader(true, nodeType, address) case "master": } + return nil } -func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; !found { - return + return nil } else { existingNode.counter-- if existingNode.counter <= 0 { delete(cluster.nodes, address) - cluster.ensureLeader(false, address) + return cluster.ensureLeader(false, nodeType, address) } } case "master": } + return nil } func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { @@ -82,13 +85,40 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) return } -func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { +func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { if cluster.leaders.addLeaderIfVacant(address) { // has added the address as one leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, + }, + }) + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, + }, + }) } } else { if cluster.leaders.removeLeaderIfExists(address) { + + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, + }, + }) + // pick the freshest one, since it is less likely to go away var shortestDuration int64 = math.MaxInt64 now := time.Now() @@ -105,10 +135,28 @@ func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { } if candidateAddress != "" { cluster.leaders.addLeaderIfVacant(candidateAddress) + // added a new leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(candidateAddress), + IsLeader: true, + IsAdd: true, + }, + }) } - // removed the leader, and maybe added a new leader + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, + }, + }) } } + return } func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { |
