diff options
| author | chrislu <chris.lu@gmail.com> | 2023-06-19 18:19:13 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-06-19 18:19:13 -0700 |
| commit | 8ec1bc2c99b4f63dda5866e0c09841abc8845087 (patch) | |
| tree | a6e54bba06f6fc126835427707a848b4268aefcd /weed/cluster | |
| parent | f97e663f0db379923057171a150720ce300886c3 (diff) | |
| download | seaweedfs-8ec1bc2c99b4f63dda5866e0c09841abc8845087.tar.xz seaweedfs-8ec1bc2c99b4f63dda5866e0c09841abc8845087.zip | |
remove unused cluster node leader
Diffstat (limited to 'weed/cluster')
| -rw-r--r-- | weed/cluster/cluster.go | 181 | ||||
| -rw-r--r-- | weed/cluster/cluster_test.go | 8 | ||||
| -rw-r--r-- | weed/cluster/master_client.go | 1 |
3 files changed, 22 insertions, 168 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index f0d8e3fb7..808adbeab 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -3,7 +3,6 @@ package cluster import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "math" "sync" "time" ) @@ -32,7 +31,6 @@ type ClusterNode struct { } type GroupMembers struct { members map[pb.ServerAddress]*ClusterNode - leaders *Leaders } type ClusterNodeGroups struct { groupMembers map[FilerGroupName]*GroupMembers @@ -53,7 +51,6 @@ func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfN if !found && createIfNotFound { members = &GroupMembers{ members: make(map[pb.ServerAddress]*ClusterNode), - leaders: &Leaders{}, } g.groupMembers[filerGroup] = members } @@ -89,12 +86,19 @@ func (m *GroupMembers) removeMember(address pb.ServerAddress) bool { return false } +func (m *GroupMembers) GetMembers() (addresses []pb.ServerAddress) { + for k := range m.members { + addresses = append(addresses, k) + } + return +} + func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { g.Lock() defer g.Unlock() m := g.getGroupMembers(filerGroup, true) if t := m.addMember(dataCenter, rack, address, version); t != nil { - return ensureGroupLeaders(m, true, filerGroup, nodeType, address) + return buildClusterNodeUpdateMessage(true, filerGroup, nodeType, address) } return nil } @@ -106,7 +110,7 @@ func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeTyp return nil } if m.removeMember(address) { - return ensureGroupLeaders(m, false, filerGroup, nodeType, address) + return buildClusterNodeUpdateMessage(false, filerGroup, nodeType, address) } return nil } @@ -122,24 +126,6 @@ func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes [] } return } -func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.ServerAddress) bool { - g.Lock() - defer g.Unlock() - m := g.getGroupMembers(filerGroup, false) - if m == nil { - return false - } - return m.leaders.isOneLeader(address) -} -func (g *ClusterNodeGroups) ListClusterNodeLeaders(filerGroup FilerGroupName) (nodes []pb.ServerAddress) { - g.Lock() - defer g.Unlock() - m := g.getGroupMembers(filerGroup, false) - if m == nil { - return nil - } - return m.leaders.GetLeaders() -} func NewCluster() *Cluster { return &Cluster{ @@ -211,145 +197,14 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType stri return } -func (cluster *Cluster) ListClusterNodeLeaders(filerGroup FilerGroupName, nodeType string) (nodes []pb.ServerAddress) { - switch nodeType { - case FilerType: - return cluster.filerGroups.ListClusterNodeLeaders(filerGroup) - case BrokerType: - return cluster.brokerGroups.ListClusterNodeLeaders(filerGroup) - case MasterType: - } - return -} - -func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool { - switch nodeType { - case FilerType: - return cluster.filerGroups.IsOneLeader(filerGroup, address) - case BrokerType: - return cluster.brokerGroups.IsOneLeader(filerGroup, address) - case MasterType: - } - return false -} - -func ensureGroupLeaders(m *GroupMembers, isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { - if isAdd { - if m.leaders.addLeaderIfVacant(address) { - // has added the address as one leader - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: true, - }, - }) - } else { - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: true, - }, - }) - } - } else { - if m.leaders.removeLeaderIfExists(address) { - - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: false, - }, - }) - - // pick the freshest one, since it is less likely to go away - var shortestDuration int64 = math.MaxInt64 - now := time.Now() - var candidateAddress pb.ServerAddress - for _, node := range m.members { - if m.leaders.isOneLeader(node.Address) { - continue - } - duration := now.Sub(node.CreatedTs).Nanoseconds() - if duration < shortestDuration { - shortestDuration = duration - candidateAddress = node.Address - } - } - if candidateAddress != "" { - m.leaders.addLeaderIfVacant(candidateAddress) - // added a new leader - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(candidateAddress), - IsLeader: true, - IsAdd: true, - }, - }) - } - } else { - result = append(result, &master_pb.KeepConnectedResponse{ - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - FilerGroup: string(filerGroup), - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: false, - }, - }) - } - } - return -} - -func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { - if leaders.isOneLeader(address) { - return - } - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == "" { - leaders.leaders[i] = address - hasChanged = true - return - } - } - return -} -func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { - if !leaders.isOneLeader(address) { - return - } - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == address { - leaders.leaders[i] = "" - hasChanged = true - return - } - } - return -} -func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] == address { - return true - } - } - return false -} -func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { - for i := 0; i < len(leaders.leaders); i++ { - if leaders.leaders[i] != "" { - addresses = append(addresses, leaders.leaders[i]) - } - } +func buildClusterNodeUpdateMessage(isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsAdd: isAdd, + }, + }) return } diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go index 53e90a7b8..7d899258c 100644 --- a/weed/cluster/cluster_test.go +++ b/weed/cluster/cluster_test.go @@ -16,7 +16,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { assert.Equal(t, []pb.ServerAddress{ pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), - }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).GetMembers) c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:3"), "23.45") c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:4"), "23.45") @@ -24,7 +24,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).GetMembers()) c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:5"), "23.45") c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:6"), "23.45") @@ -33,7 +33,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).GetMembers()) // remove oldest c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1")) @@ -41,7 +41,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:6"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).GetMembers()) // remove oldest c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1")) diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go index 15009e132..bab2360fe 100644 --- a/weed/cluster/master_client.go +++ b/weed/cluster/master_client.go @@ -22,7 +22,6 @@ func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOp existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ NodeType: FilerType, Address: node.Address, - IsLeader: node.IsLeader, IsAdd: true, CreatedAtNs: node.CreatedAtNs, }) |
