aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-11-04 00:54:38 -0700
committerChris Lu <chris.lu@gmail.com>2021-11-04 00:54:38 -0700
commit77f90ae288924822c883fdfb28efd1b9a7ee90c1 (patch)
tree3bef36d511bd34eb1293b7a49397dd3988b0426c
parent35c37562bc34393853de1c54ed06740bdffdf919 (diff)
downloadseaweedfs-77f90ae288924822c883fdfb28efd1b9a7ee90c1.tar.xz
seaweedfs-77f90ae288924822c883fdfb28efd1b9a7ee90c1.zip
add leader election in master
-rw-r--r--weed/election/cluster.go155
-rw-r--r--weed/election/cluster_test.go47
-rw-r--r--weed/server/master_grpc_server_cluster.go4
-rw-r--r--weed/server/master_server.go5
-rw-r--r--weed/server/master_server_cluster.go77
5 files changed, 207 insertions, 81 deletions
diff --git a/weed/election/cluster.go b/weed/election/cluster.go
new file mode 100644
index 000000000..7c7c1089b
--- /dev/null
+++ b/weed/election/cluster.go
@@ -0,0 +1,155 @@
+package election
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "math"
+ "sync"
+ "time"
+)
+
+type ClusterNode struct {
+ Address pb.ServerAddress
+ Version string
+ counter int
+ createdTs time.Time
+}
+
+type Leaders struct {
+ leaders [3]pb.ServerAddress
+}
+
+type Cluster struct {
+ nodes map[pb.ServerAddress]*ClusterNode
+ nodesLock sync.RWMutex
+ leaders *Leaders
+}
+
+func NewCluster() *Cluster {
+ return &Cluster{
+ nodes: make(map[pb.ServerAddress]*ClusterNode),
+ leaders: &Leaders{},
+ }
+}
+
+func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
+ switch nodeType {
+ case "filer":
+ cluster.nodesLock.Lock()
+ defer cluster.nodesLock.Unlock()
+ if existingNode, found := cluster.nodes[address]; found {
+ existingNode.counter++
+ return
+ }
+ cluster.nodes[address] = &ClusterNode{
+ Address: address,
+ Version: version,
+ counter: 1,
+ createdTs: time.Now(),
+ }
+ cluster.ensureLeader(true, address)
+ case "master":
+ }
+}
+
+func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
+ switch nodeType {
+ case "filer":
+ cluster.nodesLock.Lock()
+ defer cluster.nodesLock.Unlock()
+ if existingNode, found := cluster.nodes[address]; !found {
+ return
+ } else {
+ existingNode.counter--
+ if existingNode.counter <= 0 {
+ delete(cluster.nodes, address)
+ cluster.ensureLeader(false, address)
+ }
+ }
+ case "master":
+ }
+}
+
+func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
+ switch nodeType {
+ case "filer":
+ cluster.nodesLock.RLock()
+ defer cluster.nodesLock.RUnlock()
+ for _, node := range cluster.nodes {
+ nodes = append(nodes, node)
+ }
+ case "master":
+ }
+ return
+}
+
+func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
+ if isAdd {
+ if cluster.leaders.addLeaderIfVacant(address) {
+ // has added the address as one leader
+ }
+ } else {
+ if cluster.leaders.removeLeaderIfExists(address) {
+ // pick the freshest one, since it is less likely to go away
+ var shortestDuration int64 = math.MaxInt64
+ now := time.Now()
+ var candidateAddress pb.ServerAddress
+ for _, node := range cluster.nodes {
+ if cluster.leaders.isOneLeader(node.Address) {
+ continue
+ }
+ duration := now.Sub(node.createdTs).Nanoseconds()
+ if duration < shortestDuration {
+ shortestDuration = duration
+ candidateAddress = node.Address
+ }
+ }
+ if candidateAddress != "" {
+ cluster.leaders.addLeaderIfVacant(candidateAddress)
+ }
+ // removed the leader, and maybe added a new leader
+ }
+ }
+}
+
+func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {
+ if leaders.isOneLeader(address) {
+ return
+ }
+ for i := 0; i < len(leaders.leaders); i++ {
+ if leaders.leaders[i] == "" {
+ leaders.leaders[i] = address
+ hasChanged = true
+ return
+ }
+ }
+ return
+}
+func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) {
+ if !leaders.isOneLeader(address) {
+ return
+ }
+ for i := 0; i < len(leaders.leaders); i++ {
+ if leaders.leaders[i] == address {
+ leaders.leaders[i] = ""
+ hasChanged = true
+ return
+ }
+ }
+ return
+}
+func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool {
+ for i := 0; i < len(leaders.leaders); i++ {
+ if leaders.leaders[i] == address {
+ return true
+ }
+ }
+ return false
+}
+func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) {
+ for i := 0; i < len(leaders.leaders); i++ {
+ if leaders.leaders[i] != "" {
+ addresses = append(addresses, leaders.leaders[i])
+ }
+ }
+ return
+}
diff --git a/weed/election/cluster_test.go b/weed/election/cluster_test.go
new file mode 100644
index 000000000..624ff27d6
--- /dev/null
+++ b/weed/election/cluster_test.go
@@ -0,0 +1,47 @@
+package election
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestClusterAddRemoveNodes(t *testing.T) {
+ c := NewCluster()
+
+ c.AddClusterNode("filer", pb.ServerAddress("111:1"), "23.45")
+ c.AddClusterNode("filer", pb.ServerAddress("111:2"), "23.45")
+ assert.Equal(t, []pb.ServerAddress{
+ pb.ServerAddress("111:1"),
+ pb.ServerAddress("111:2"),
+ }, c.leaders.GetLeaders())
+
+ c.AddClusterNode("filer", pb.ServerAddress("111:3"), "23.45")
+ c.AddClusterNode("filer", pb.ServerAddress("111:4"), "23.45")
+ assert.Equal(t, []pb.ServerAddress{
+ pb.ServerAddress("111:1"),
+ pb.ServerAddress("111:2"),
+ pb.ServerAddress("111:3"),
+ }, c.leaders.GetLeaders())
+
+ c.AddClusterNode("filer", pb.ServerAddress("111:5"), "23.45")
+ c.AddClusterNode("filer", pb.ServerAddress("111:6"), "23.45")
+ c.RemoveClusterNode("filer", pb.ServerAddress("111:4"))
+ assert.Equal(t, []pb.ServerAddress{
+ pb.ServerAddress("111:1"),
+ pb.ServerAddress("111:2"),
+ pb.ServerAddress("111:3"),
+ }, c.leaders.GetLeaders())
+
+ // remove oldest
+ c.RemoveClusterNode("filer", pb.ServerAddress("111:1"))
+ assert.Equal(t, []pb.ServerAddress{
+ pb.ServerAddress("111:6"),
+ pb.ServerAddress("111:2"),
+ pb.ServerAddress("111:3"),
+ }, c.leaders.GetLeaders())
+
+ // remove oldest
+ c.RemoveClusterNode("filer", pb.ServerAddress("111:1"))
+
+}
diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go
index 68801a3ba..8e80cade3 100644
--- a/weed/server/master_grpc_server_cluster.go
+++ b/weed/server/master_grpc_server_cluster.go
@@ -12,8 +12,8 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis
for _, node := range clusterNodes {
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
- Address: string(node.address),
- Version: node.version,
+ Address: string(node.Address),
+ Version: node.Version,
})
}
return resp, nil
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3f1f1c082..af2f7ddd5 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/election"
"github.com/chrislusf/seaweedfs/weed/pb"
"net/http"
"net/http/httputil"
@@ -68,7 +69,7 @@ type MasterServer struct {
adminLocks *AdminLocks
- Cluster *Cluster
+ Cluster *election.Cluster
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
@@ -105,7 +106,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(),
- Cluster: NewCluster(),
+ Cluster: election.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
diff --git a/weed/server/master_server_cluster.go b/weed/server/master_server_cluster.go
deleted file mode 100644
index fa5280ccd..000000000
--- a/weed/server/master_server_cluster.go
+++ /dev/null
@@ -1,77 +0,0 @@
-package weed_server
-
-import (
- "github.com/chrislusf/seaweedfs/weed/pb"
- "sync"
-)
-
-type NodeType int
-
-const (
- filerNodeType NodeType = iota
-)
-
-type ClusterNode struct {
- address pb.ServerAddress
- version string
- counter int
-}
-
-type Cluster struct {
- filers map[pb.ServerAddress]*ClusterNode
- filersLock sync.RWMutex
-}
-
-func NewCluster() *Cluster {
- return &Cluster{
- filers: make(map[pb.ServerAddress]*ClusterNode),
- }
-}
-
-func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
- switch nodeType {
- case "filer":
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
- if existingNode, found := cluster.filers[address]; found {
- existingNode.counter++
- return
- }
- cluster.filers[address] = &ClusterNode{
- address: address,
- version: version,
- counter: 1,
- }
- case "master":
- }
-}
-
-func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
- switch nodeType {
- case "filer":
- cluster.filersLock.Lock()
- defer cluster.filersLock.Unlock()
- if existingNode, found := cluster.filers[address]; !found {
- return
- } else {
- existingNode.counter--
- if existingNode.counter <= 0 {
- delete(cluster.filers, address)
- }
- }
- case "master":
- }
-}
-
-func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
- switch nodeType {
- case "filer":
- cluster.filersLock.RLock()
- defer cluster.filersLock.RUnlock()
- for _, node := range cluster.filers {
- nodes = append(nodes, node)
- }
- case "master":
- }
- return
-}