diff options
| author | chrislu <chris.lu@gmail.com> | 2022-05-01 21:59:16 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-05-01 21:59:16 -0700 |
| commit | 94635e9b5c81d6afa1b54f20f5d2d2595bba2322 (patch) | |
| tree | 9e6eb799c480603598b020e4f6cdddaa21f555af /weed/cluster | |
| parent | 4bd6bea4295cd238278b20f18a15664fa3465008 (diff) | |
| download | seaweedfs-94635e9b5c81d6afa1b54f20f5d2d2595bba2322.tar.xz seaweedfs-94635e9b5c81d6afa1b54f20f5d2d2595bba2322.zip | |
filer: add filer group
Diffstat (limited to 'weed/cluster')
| -rw-r--r-- | weed/cluster/cluster.go | 132 |
1 files changed, 82 insertions, 50 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 3cff13724..86daf24e7 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -15,6 +15,15 @@ const ( BrokerType = "broker" ) +type FilerGroup string +type Filers struct { + filers map[pb.ServerAddress]*ClusterNode + leaders *Leaders +} +type Leaders struct { + leaders [3]pb.ServerAddress +} + type ClusterNode struct { Address pb.ServerAddress Version string @@ -22,42 +31,50 @@ type ClusterNode struct { createdTs time.Time } -type Leaders struct { - leaders [3]pb.ServerAddress -} - type Cluster struct { - filers map[pb.ServerAddress]*ClusterNode - filersLock sync.RWMutex - filerLeaders *Leaders - brokers map[pb.ServerAddress]*ClusterNode - brokersLock sync.RWMutex + filerGroup2filers map[FilerGroup]*Filers + filersLock sync.RWMutex + brokers map[pb.ServerAddress]*ClusterNode + brokersLock sync.RWMutex } func NewCluster() *Cluster { return &Cluster{ - filers: make(map[pb.ServerAddress]*ClusterNode), - filerLeaders: &Leaders{}, - brokers: make(map[pb.ServerAddress]*ClusterNode), + filerGroup2filers: make(map[FilerGroup]*Filers), + brokers: make(map[pb.ServerAddress]*ClusterNode), } } -func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers { + cluster.filersLock.Lock() + defer cluster.filersLock.Unlock() + filers, found := cluster.filerGroup2filers[filerGroup] + if !found && createIfNotFound { + filers = &Filers{ + filers: make(map[pb.ServerAddress]*ClusterNode), + leaders: &Leaders{}, + } + cluster.filerGroup2filers[filerGroup] = filers + } + return filers +} + +func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroup(ns) switch nodeType { case FilerType: - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - if existingNode, found := cluster.filers[address]; found { + filers := cluster.getFilers(filerGroup, true) + if existingNode, found := filers.filers[address]; found { existingNode.counter++ return nil } - cluster.filers[address] = &ClusterNode{ + filers.filers[address] = &ClusterNode{ Address: address, Version: version, counter: 1, createdTs: time.Now(), } - return cluster.ensureFilerLeaders(true, nodeType, address) + return cluster.ensureFilerLeaders(filers, true, filerGroup, nodeType, address) case BrokerType: cluster.brokersLock.Lock() defer cluster.brokersLock.Unlock() @@ -94,18 +111,21 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress return nil } -func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroup(ns) switch nodeType { case FilerType: - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - if existingNode, found := cluster.filers[address]; !found { + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return nil + } + if existingNode, found := filers.filers[address]; !found { return nil } else { existingNode.counter-- if existingNode.counter <= 0 { - delete(cluster.filers, address) - return cluster.ensureFilerLeaders(false, nodeType, address) + delete(filers.filers, address) + return cluster.ensureFilerLeaders(filers, false, filerGroup, nodeType, address) } } case BrokerType: @@ -142,12 +162,16 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr return nil } -func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { +func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) { switch nodeType { case FilerType: + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return + } cluster.filersLock.RLock() defer cluster.filersLock.RUnlock() - for _, node := range cluster.filers { + for _, node := range filers.filers { nodes = append(nodes, node) } case BrokerType: @@ -161,41 +185,48 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) return } -func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool { - return cluster.filerLeaders.isOneLeader(address) +func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool { + filers := cluster.getFilers(filerGroup, false) + if filers == nil { + return false + } + return filers.leaders.isOneLeader(address) } -func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { +func (cluster *Cluster) ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { - if cluster.filerLeaders.addLeaderIfVacant(address) { + if filers.leaders.addLeaderIfVacant(address) { // has added the address as one leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: true, + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, }, }) } else { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: true, + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, }, }) } } else { - if cluster.filerLeaders.removeLeaderIfExists(address) { + if filers.leaders.removeLeaderIfExists(address) { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsLeader: true, - IsAdd: false, + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, }, }) @@ -203,8 +234,8 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address var shortestDuration int64 = math.MaxInt64 now := time.Now() var candidateAddress pb.ServerAddress - for _, node := range cluster.filers { - if cluster.filerLeaders.isOneLeader(node.Address) { + for _, node := range filers.filers { + if filers.leaders.isOneLeader(node.Address) { continue } duration := now.Sub(node.createdTs).Nanoseconds() @@ -214,7 +245,7 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address } } if candidateAddress != "" { - cluster.filerLeaders.addLeaderIfVacant(candidateAddress) + filers.leaders.addLeaderIfVacant(candidateAddress) // added a new leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ @@ -228,10 +259,11 @@ func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address } else { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsLeader: false, - IsAdd: false, + FilerGroup: string(filerGroup), + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, }, }) } |
