diff options
Diffstat (limited to 'weed/cluster/cluster.go')
| -rw-r--r-- | weed/cluster/cluster.go | 181 |
1 files changed, 18 insertions, 163 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index f0d8e3fb7..808adbeab 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -3,7 +3,6 @@ package cluster import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "math" "sync" "time" ) @@ -32,7 +31,6 @@ type ClusterNode struct { } type GroupMembers struct { members map[pb.ServerAddress]*ClusterNode - leaders *Leaders } type ClusterNodeGroups struct { groupMembers map[FilerGroupName]*GroupMembers @@ -53,7 +51,6 @@ func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfN if !found && createIfNotFound { members = &GroupMembers{ members: make(map[pb.ServerAddress]*ClusterNode), - leaders: &Leaders{}, } g.groupMembers[filerGroup] = members } @@ -89,12 +86,19 @@ func (m *GroupMembers) removeMember(address pb.ServerAddress) bool { return false } +func (m *GroupMembers) GetMembers() (addresses []pb.ServerAddress) { + for k := range m.members { + addresses = append(addresses, k) + } + return +} + func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { g.Lock() defer g.Unlock() m := g.getGroupMembers(filerGroup, true) if t := m.addMember(dataCenter, rack, address, version); t != nil { - return ensureGroupLeaders(m, true, filerGroup, nodeType, address) + return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address) } return nil } @@ -106,7 +110,7 @@ func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeTyp return nil } if m.removeMember(address) { - return ensureGroupLeaders(m, false, filerGroup, nodeType, address) + return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address) } return nil } @@ -122,24 +126,6 @@ func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes [] } return } -func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.ServerAddress) bool { - g.Lock() - defer g.Unlock() - m := g.getGroupMembers(filerGroup, false) - if m == nil { - return false - } - return m.leaders.isOneLeader(address) -} -func (g *ClusterNodeGroups) ListClusterNodeLeaders(filerGroup FilerGroupName) (nodes []pb.ServerAddress) { - g.Lock() - defer g.Unlock() - m := g.getGroupMembers(filerGroup, false) - if m == nil { - return nil - } - return m.leaders.GetLeaders() -} func NewCluster() *Cluster { return &Cluster{ @@ -211,145 +197,14 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri return } -func (cluster *Cluster) ListClusterNodeLeaders(filerGroup FilerGroupName, nodeType string) (nodes []pb.ServerAddress) { - switch nodeType { - case FilerType: - return cluster.filerGroups.ListClusterNodeLeaders(filerGroup) - case BrokerType: - return cluster.brokerGroups.ListClusterNodeLeaders(filerGroup) - case MasterType: - } - return -} - -func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool { - switch nodeType { - case FilerType: - return cluster.filerGroups.IsOneLeader(filerGroup, address) - case BrokerType: - return cluster.brokerGroups.IsOneLeader(filerGroup, address) - case MasterType: - } - return false -} - -func ensureGroupLeaders(m *GroupMembers, isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { - if isAdd { - if m.leaders.addLeaderIfVacant(address) { - // has added the address as one leader - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: true, - }, - }) - } else { - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: true, - }, - }) - } - } else { - if m.leaders.removeLeaderIfExists(address) { - - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: false, - }, - }) - - // pick the freshest one, since it is less likely to go away - var shortestDuration int64 = math.MaxInt64 - now := time.Now() - var candidateAddress pb.ServerAddress - for _, node := range m.members { - if m.leaders.isOneLeader(node.Address) { - continue - } - duration := now.Sub(node.CreatedTs).Nanoseconds() - if duration < shortestDuration { - shortestDuration = duration - candidateAddress = node.Address - } - } - if candidateAddress != "" { - m.leaders.addLeaderIfVacant(candidateAddress) - // added a new leader - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(candidateAddress), - IsLeader: true, - IsAdd: true, - }, - }) - } - } else { - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: false, - }, - }) - } - } - return -} - -func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { - if leaders.isOneLeader(address) { - return - } - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == "" { - leaders.leaders[i] = address - hasChanged = true - return - } - } - return -} -func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { - if !leaders.isOneLeader(address) { - return - } - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == address { - leaders.leaders[i] = "" - hasChanged = true - return - } - } - return -} -func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == address { - return true - } - } - return false -} -func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] != "" { - addresses = append(addresses, leaders.leaders[i]) - } - } +func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsAdd: isAdd, + }, + }) return } |
