diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-03 00:29:25 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:22:51 -0700 |
| commit | 68065128b83997365a0f267799026b0e520b9816 (patch) | |
| tree | bd3b4dda9a56de3fb151fc3eecf0e996af95153a /weed/cluster/cluster.go | |
| parent | 8d31e73ffd7d33d79c1ce125e1df2f9464ccbe09 (diff) | |
| download | seaweedfs-68065128b83997365a0f267799026b0e520b9816.tar.xz seaweedfs-68065128b83997365a0f267799026b0e520b9816.zip | |
add dc and rack
Diffstat (limited to 'weed/cluster/cluster.go')
| -rw-r--r-- | weed/cluster/cluster.go | 82 |
1 files changed, 60 insertions, 22 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index ad6e6b879..1802ecda0 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -24,6 +24,15 @@ type Leaders struct { leaders [3]pb.ServerAddress } +type DataCenter string +type Rack string +type DataCenterBrokers struct { + brokers map[Rack]*RackBrokers +} +type RackBrokers struct { + brokers map[pb.ServerAddress]*ClusterNode +} + type ClusterNode struct { Address pb.ServerAddress Version string @@ -34,14 +43,14 @@ type ClusterNode struct { type Cluster struct { filerGroup2filers map[FilerGroup]*Filers filersLock sync.RWMutex - brokers map[pb.ServerAddress]*ClusterNode + brokers map[DataCenter]*DataCenterBrokers brokersLock sync.RWMutex } func NewCluster() *Cluster { return &Cluster{ filerGroup2filers: make(map[FilerGroup]*Filers), - brokers: make(map[pb.ServerAddress]*ClusterNode), + brokers: make(map[DataCenter]*DataCenterBrokers), } } @@ -57,7 +66,7 @@ func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) return filers } -func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { filerGroup := FilerGroup(ns) switch nodeType { case FilerType: @@ -78,11 +87,24 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd case BrokerType: cluster.brokersLock.Lock() defer cluster.brokersLock.Unlock() - if existingNode, found := cluster.brokers[address]; found { - existingNode.counter++ + existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] + if !foundDataCenter { + existingDataCenterBrokers = &DataCenterBrokers{ + brokers: make(map[Rack]*RackBrokers), + } + } + existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack] + if !foundRack { + existingRackBrokers = &RackBrokers{ + brokers: make(map[pb.ServerAddress]*ClusterNode), + } + } + + if existingBroker, found := existingRackBrokers.brokers[address]; found { + existingBroker.counter++ return nil } - cluster.brokers[address] = &ClusterNode{ + existingRackBrokers.brokers[address] = &ClusterNode{ Address: address, Version: version, counter: 1, @@ -111,7 +133,7 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd return nil } -func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { filerGroup := FilerGroup(ns) switch nodeType { case FilerType: @@ -133,23 +155,35 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb case BrokerType: cluster.brokersLock.Lock() defer cluster.brokersLock.Unlock() - if existingNode, found := cluster.brokers[address]; !found { + + existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] + if !foundDataCenter { return nil - } else { - existingNode.counter-- - if existingNode.counter <= 0 { - delete(cluster.brokers, address) - return []*master_pb.KeepConnectedResponse{ - { - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsAdd: false, - }, + } + existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)] + if !foundRack { + return nil + } + + existingBroker, found := existingRackBrokers.brokers[address] + if !found { + return nil + } + existingBroker.counter-- + if existingBroker.counter <= 0 { + delete(existingRackBrokers.brokers, address) + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, }, - } + }, } } + return nil + case MasterType: return []*master_pb.KeepConnectedResponse{ { @@ -179,8 +213,12 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) case BrokerType: cluster.brokersLock.RLock() defer cluster.brokersLock.RUnlock() - for _, node := range cluster.brokers { - nodes = append(nodes, node) + for _, dcNodes := range cluster.brokers { + for _, rackNodes := range dcNodes.brokers { + for _, node := range rackNodes.brokers { + nodes = append(nodes, node) + } + } } case MasterType: } |
