diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/cluster/cluster.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/cluster/cluster.go')
| -rw-r--r-- | weed/cluster/cluster.go | 315 |
1 files changed, 315 insertions, 0 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go new file mode 100644 index 000000000..6c24df44c --- /dev/null +++ b/weed/cluster/cluster.go @@ -0,0 +1,315 @@ +package cluster + +import ( + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "math" + "sync" + "time" +) + +const ( + MasterType = "master" + VolumeServerType = "volumeServer" + FilerType = "filer" + BrokerType = "broker" +) + +type FilerGroup string +type Filers struct { + filers map[pb.ServerAddress]*ClusterNode + leaders *Leaders +} +type Leaders struct { + leaders [3]pb.ServerAddress +} + +type ClusterNode struct { + Address pb.ServerAddress + Version string + counter int + CreatedTs time.Time +} + +type Cluster struct { + filerGroup2filers map[FilerGroup]*Filers + filersLock sync.RWMutex + brokers map[pb.ServerAddress]*ClusterNode + brokersLock sync.RWMutex +} + +func NewCluster() *Cluster { + return &Cluster{ + filerGroup2filers: make(map[FilerGroup]*Filers), + brokers: make(map[pb.ServerAddress]*ClusterNode), + } +} + +func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers { + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + filers, found := cluster.filerGroup2filers[filerGroup] + if !found && createIfNotFound { + filers = &Filers{ + filers: make(map[pb.ServerAddress]*ClusterNode), + leaders: &Leaders{}, + } + cluster.filerGroup2filers[filerGroup] = filers + } + return filers +} + +func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroup(ns) + switch nodeType { + case FilerType: + filers := cluster.getFilers(filerGroup, true) + if existingNode, found := filers.filers[address]; found { + existingNode.counter++ + return nil + } + filers.filers[address] = &ClusterNode{ + Address: address, + Version: version, + counter: 1, + CreatedTs: time.Now(), + } + return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address) + case BrokerType: + cluster.brokersLock.Lock() + defer cluster.brokersLock.Unlock() + if existingNode, found := cluster.brokers[address]; found { + existingNode.counter++ + return nil + } + cluster.brokers[address] = &ClusterNode{ + Address: address, + Version: version, + counter: 1, + CreatedTs: time.Now(), + } + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: true, + }, + }, + } + case MasterType: + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: true, + }, + }, + } + } + return nil +} + +func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroup(ns) + switch nodeType { + case FilerType: + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return nil + } + if existingNode, found := filers.filers[address]; !found { + return nil + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(filers.filers, address) + return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address) + } + } + case BrokerType: + cluster.brokersLock.Lock() + defer cluster.brokersLock.Unlock() + if existingNode, found := cluster.brokers[address]; !found { + return nil + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(cluster.brokers, address) + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, + }, + }, + } + } + } + case MasterType: + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, + }, + }, + } + } + return nil +} + +func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) { + switch nodeType { + case FilerType: + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return + } + cluster.filersLock.RLock() + defer cluster.filersLock.RUnlock() + for _, node := range filers.filers { + nodes = append(nodes, node) + } + case BrokerType: + cluster.brokersLock.RLock() + defer cluster.brokersLock.RUnlock() + for _, node := range cluster.brokers { + nodes = append(nodes, node) + } + case MasterType: + } + return +} + +func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool { + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return false + } + return filers.leaders.isOneLeader(address) +} + +func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { + if isAdd { + if filers.leaders.addLeaderIfVacant(address) { + // has added the address as one leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, + }, + }) + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, + }, + }) + } + } else { + if filers.leaders.removeLeaderIfExists(address) { + + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, + }, + }) + + // pick the freshest one, since it is less likely to go away + var shortestDuration int64 = math.MaxInt64 + now := time.Now() + var candidateAddress pb.ServerAddress + for _, node := range filers.filers { + if filers.leaders.isOneLeader(node.Address) { + continue + } + duration := now.Sub(node.CreatedTs).Nanoseconds() + if duration < shortestDuration { + shortestDuration = duration + candidateAddress = node.Address + } + } + if candidateAddress != "" { + filers.leaders.addLeaderIfVacant(candidateAddress) + // added a new leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(candidateAddress), + IsLeader: true, + IsAdd: true, + }, + }) + } + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, + }, + }) + } + } + return +} + +func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { + if leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == "" { + leaders.leaders[i] = address + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { + if !leaders.isOneLeader(address) { + return + } + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + leaders.leaders[i] = "" + hasChanged = true + return + } + } + return +} +func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] == address { + return true + } + } + return false +} +func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { + for i := 0; i < len(leaders.leaders); i++ { + if leaders.leaders[i] != "" { + addresses = append(addresses, leaders.leaders[i]) + } + } + return +} |
