aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/cluster.go')
-rw-r--r--weed/cluster/cluster.go181
1 files changed, 18 insertions, 163 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index f0d8e3fb7..808adbeab 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -3,7 +3,6 @@ package cluster
import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "math"
"sync"
"time"
)
@@ -32,7 +31,6 @@ type ClusterNode struct {
}
type GroupMembers struct {
members map[pb.ServerAddress]*ClusterNode
- leaders *Leaders
}
type ClusterNodeGroups struct {
groupMembers map[FilerGroupName]*GroupMembers
@@ -53,7 +51,6 @@ func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfN
if !found && createIfNotFound {
members = &GroupMembers{
members: make(map[pb.ServerAddress]*ClusterNode),
- leaders: &Leaders{},
}
g.groupMembers[filerGroup] = members
}
@@ -89,12 +86,19 @@ func (m *GroupMembers) removeMember(address pb.ServerAddress) bool {
return false
}
+func (m *GroupMembers) GetMembers() (addresses []pb.ServerAddress) {
+ for k := range m.members {
+ addresses = append(addresses, k)
+ }
+ return
+}
+
func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
g.Lock()
defer g.Unlock()
m := g.getGroupMembers(filerGroup, true)
if t := m.addMember(dataCenter, rack, address, version); t != nil {
- return ensureGroupLeaders(m, true, filerGroup, nodeType, address)
+ return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address)
}
return nil
}
@@ -106,7 +110,7 @@ func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeTyp
return nil
}
if m.removeMember(address) {
- return ensureGroupLeaders(m, false, filerGroup, nodeType, address)
+ return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address)
}
return nil
}
@@ -122,24 +126,6 @@ func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []
}
return
}
-func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.ServerAddress) bool {
- g.Lock()
- defer g.Unlock()
- m := g.getGroupMembers(filerGroup, false)
- if m == nil {
- return false
- }
- return m.leaders.isOneLeader(address)
-}
-func (g *ClusterNodeGroups) ListClusterNodeLeaders(filerGroup FilerGroupName) (nodes []pb.ServerAddress) {
- g.Lock()
- defer g.Unlock()
- m := g.getGroupMembers(filerGroup, false)
- if m == nil {
- return nil
- }
- return m.leaders.GetLeaders()
-}
func NewCluster() *Cluster {
return &Cluster{
@@ -211,145 +197,14 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri
return
}
-func (cluster *Cluster) ListClusterNodeLeaders(filerGroup FilerGroupName, nodeType string) (nodes []pb.ServerAddress) {
- switch nodeType {
- case FilerType:
- return cluster.filerGroups.ListClusterNodeLeaders(filerGroup)
- case BrokerType:
- return cluster.brokerGroups.ListClusterNodeLeaders(filerGroup)
- case MasterType:
- }
- return
-}
-
-func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool {
- switch nodeType {
- case FilerType:
- return cluster.filerGroups.IsOneLeader(filerGroup, address)
- case BrokerType:
- return cluster.brokerGroups.IsOneLeader(filerGroup, address)
- case MasterType:
- }
- return false
-}
-
-func ensureGroupLeaders(m *GroupMembers, isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
- if isAdd {
- if m.leaders.addLeaderIfVacant(address) {
- // has added the address as one leader
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- FilerGroup: string(filerGroup),
- NodeType: nodeType,
- Address: string(address),
- IsLeader: true,
- IsAdd: true,
- },
- })
- } else {
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- FilerGroup: string(filerGroup),
- NodeType: nodeType,
- Address: string(address),
- IsLeader: false,
- IsAdd: true,
- },
- })
- }
- } else {
- if m.leaders.removeLeaderIfExists(address) {
-
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- FilerGroup: string(filerGroup),
- NodeType: nodeType,
- Address: string(address),
- IsLeader: true,
- IsAdd: false,
- },
- })
-
- // pick the freshest one, since it is less likely to go away
- var shortestDuration int64 = math.MaxInt64
- now := time.Now()
- var candidateAddress pb.ServerAddress
- for _, node := range m.members {
- if m.leaders.isOneLeader(node.Address) {
- continue
- }
- duration := now.Sub(node.CreatedTs).Nanoseconds()
- if duration < shortestDuration {
- shortestDuration = duration
- candidateAddress = node.Address
- }
- }
- if candidateAddress != "" {
- m.leaders.addLeaderIfVacant(candidateAddress)
- // added a new leader
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(candidateAddress),
- IsLeader: true,
- IsAdd: true,
- },
- })
- }
- } else {
- result = append(result, &master_pb.KeepConnectedResponse{
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- FilerGroup: string(filerGroup),
- NodeType: nodeType,
- Address: string(address),
- IsLeader: false,
- IsAdd: false,
- },
- })
- }
- }
- return
-}
-
-func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
- if leaders.isOneLeader(address) {
- return
- }
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == "" {
- leaders.leaders[i] = address
- hasChanged = true
- return
- }
- }
- return
-}
-func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
- if !leaders.isOneLeader(address) {
- return
- }
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == address {
- leaders.leaders[i] = ""
- hasChanged = true
- return
- }
- }
- return
-}
-func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] == address {
- return true
- }
- }
- return false
-}
-func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
- for i := 0; i < len(leaders.leaders); i++ {
- if leaders.leaders[i] != "" {
- addresses = append(addresses, leaders.leaders[i])
- }
- }
+func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
+ result = append(result, &master_pb.KeepConnectedResponse{
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ FilerGroup: string(filerGroup),
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: isAdd,
+ },
+ })
return
}