diff options
Diffstat (limited to 'weed/election/cluster.go')
| -rw-r--r-- | weed/election/cluster.go | 64 |
1 files changed, 56 insertions, 8 deletions
diff --git a/weed/election/cluster.go b/weed/election/cluster.go index 7c7c1089b..7f247f2cf 100644 --- a/weed/election/cluster.go +++ b/weed/election/cluster.go @@ -2,6 +2,7 @@ package election import ( "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math" "sync" "time" @@ -31,14 +32,14 @@ func NewCluster() *Cluster { } } -func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; found { existingNode.counter++ - return + return nil } cluster.nodes[address] = &ClusterNode{ Address: address, @@ -46,27 +47,29 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress counter: 1, createdTs: time.Now(), } - cluster.ensureLeader(true, address) + return cluster.ensureLeader(true, nodeType, address) case "master": } + return nil } -func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; !found { - return + return nil } else { existingNode.counter-- if existingNode.counter <= 0 { delete(cluster.nodes, address) - cluster.ensureLeader(false, address) + return cluster.ensureLeader(false, nodeType, address) } } case "master": } + return nil } func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { @@ -82,13 +85,40 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) return } -func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { +func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { if cluster.leaders.addLeaderIfVacant(address) { // has added the address as one leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, + }, + }) + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, + }, + }) } } else { if cluster.leaders.removeLeaderIfExists(address) { + + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, + }, + }) + // pick the freshest one, since it is less likely to go away var shortestDuration int64 = math.MaxInt64 now := time.Now() @@ -105,10 +135,28 @@ func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { } if candidateAddress != "" { cluster.leaders.addLeaderIfVacant(candidateAddress) + // added a new leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(candidateAddress), + IsLeader: true, + IsAdd: true, + }, + }) } - // removed the leader, and maybe added a new leader + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, + }, + }) } } + return } func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { |
