diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-11-07 23:52:40 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-11-08 17:47:56 -0800 |
| commit | d9dd72ea560da14ecb3cbf06cdf94b82071a9658 (patch) | |
| tree | d9c73de95926cd39df926c76b80b40e154334e7f /weed/cluster/cluster.go | |
| parent | 73a03cd965318454203f1ff7a3abb3d4afc7bea8 (diff) | |
| download | seaweedfs-d9dd72ea560da14ecb3cbf06cdf94b82071a9658.tar.xz seaweedfs-d9dd72ea560da14ecb3cbf06cdf94b82071a9658.zip | |
rename pacakge
Diffstat (limited to 'weed/cluster/cluster.go')
| -rw-r--r-- | weed/cluster/cluster.go | 207 |
1 files changed, 207 insertions, 0 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go new file mode 100644 index 000000000..11187169e --- /dev/null +++ b/weed/cluster/cluster.go @@ -0,0 +1,207 @@ +package cluster + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "math" + "sync" + "time" +) + +type ClusterNode struct { + Address pb.ServerAddress + Version string + counter int + createdTs time.Time +} + +type Leaders struct { + leaders [3]pb.ServerAddress +} + +type Cluster struct { + nodes map[pb.ServerAddress]*ClusterNode + nodesLock sync.RWMutex + leaders *Leaders +} + +func NewCluster() *Cluster { + return &Cluster{ + nodes: make(map[pb.ServerAddress]*ClusterNode), + leaders: &Leaders{}, + } +} + +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { + switch nodeType { + case "filer": + cluster.nodesLock.Lock() + defer cluster.nodesLock.Unlock() + if existingNode, found := cluster.nodes[address]; found { + existingNode.counter++ + return nil + } + cluster.nodes[address] = &ClusterNode{ + Address: address, + Version: version, + counter: 1, + createdTs: time.Now(), + } + return cluster.ensureLeader(true, nodeType, address) + case "master": + } + return nil +} + +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { + switch nodeType { + case "filer": + cluster.nodesLock.Lock() + defer cluster.nodesLock.Unlock() + if existingNode, found := cluster.nodes[address]; !found { + return nil + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(cluster.nodes, address) + return cluster.ensureLeader(false, nodeType, address) + } + } + case "master": + } + return nil +} + +func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { + switch nodeType { + case "filer": + cluster.nodesLock.RLock() + defer cluster.nodesLock.RUnlock() + for _, node := range cluster.nodes { + nodes = append(nodes, node) + } + case "master": + } + return +} + +func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool { + return cluster.leaders.isOneLeader(address) +} + +func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { + if isAdd { + if cluster.leaders.addLeaderIfVacant(address) { + // has added the address as one leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, + }, + }) + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, + }, + }) + } + } else { + if cluster.leaders.removeLeaderIfExists(address) { + + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, + }, + }) + + // pick the freshest one, since it is less likely to go away + var shortestDuration int64 = math.MaxInt64 + now := time.Now() + var candidateAddress pb.ServerAddress + for _, node := range cluster.nodes { + if cluster.leaders.isOneLeader(node.Address) { + continue + } + duration := now.Sub(node.createdTs).Nanoseconds() + if duration < shortestDuration { + shortestDuration = duration + candidateAddress = node.Address + } + } + if candidateAddress != "" { + cluster.leaders.addLeaderIfVacant(candidateAddress) + // added a new leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(candidateAddress), + IsLeader: true, + IsAdd: true, + }, + }) + } + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, + }, + }) + } + } + return +} + +func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { + if leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == "" { + leaders.leaders[i] = address + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { + if !leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + leaders.leaders[i] = "" + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + return true + } + } + return false +} +func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] != "" { + addresses = append(addresses, leaders.leaders[i]) + } + } + return +} |
