diff options
Diffstat (limited to 'weed/cluster/cluster.go')
| -rw-r--r-- | weed/cluster/cluster.go | 82 |
1 files changed, 60 insertions, 22 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index ad6e6b879..1802ecda0 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -24,6 +24,15 @@ type Leaders struct { leaders [3]pb.ServerAddress } +type DataCenter string +type Rack string +type DataCenterBrokers struct { + brokers map[Rack]*RackBrokers +} +type RackBrokers struct { + brokers map[pb.ServerAddress]*ClusterNode +} + type ClusterNode struct { Address pb.ServerAddress Version string @@ -34,14 +43,14 @@ type ClusterNode struct { type Cluster struct { filerGroup2filers map[FilerGroup]*Filers filersLock sync.RWMutex - brokers map[pb.ServerAddress]*ClusterNode + brokers map[DataCenter]*DataCenterBrokers brokersLock sync.RWMutex } func NewCluster() *Cluster { return &Cluster{ filerGroup2filers: make(map[FilerGroup]*Filers), - brokers: make(map[pb.ServerAddress]*ClusterNode), + brokers: make(map[DataCenter]*DataCenterBrokers), } } @@ -57,7 +66,7 @@ func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) return filers } -func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { filerGroup := FilerGroup(ns) switch nodeType { case FilerType: @@ -78,11 +87,24 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd case BrokerType: cluster.brokersLock.Lock() defer cluster.brokersLock.Unlock() - if existingNode, found := cluster.brokers[address]; found { - existingNode.counter++ + existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] + if !foundDataCenter { + existingDataCenterBrokers = &DataCenterBrokers{ + brokers: make(map[Rack]*RackBrokers), + } + } + existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack] + if !foundRack { + existingRackBrokers = &RackBrokers{ + brokers: make(map[pb.ServerAddress]*ClusterNode), + } + } + + if existingBroker, found := existingRackBrokers.brokers[address]; found { + existingBroker.counter++ return nil } - cluster.brokers[address] = &ClusterNode{ + existingRackBrokers.brokers[address] = &ClusterNode{ Address: address, Version: version, counter: 1, @@ -111,7 +133,7 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd return nil } -func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { +func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { filerGroup := FilerGroup(ns) switch nodeType { case FilerType: @@ -133,23 +155,35 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb case BrokerType: cluster.brokersLock.Lock() defer cluster.brokersLock.Unlock() - if existingNode, found := cluster.brokers[address]; !found { + + existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] + if !foundDataCenter { return nil - } else { - existingNode.counter-- - if existingNode.counter <= 0 { - delete(cluster.brokers, address) - return []*master_pb.KeepConnectedResponse{ - { - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsAdd: false, - }, + } + existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)] + if !foundRack { + return nil + } + + existingBroker, found := existingRackBrokers.brokers[address] + if !found { + return nil + } + existingBroker.counter-- + if existingBroker.counter <= 0 { + delete(existingRackBrokers.brokers, address) + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, }, - } + }, } } + return nil + case MasterType: return []*master_pb.KeepConnectedResponse{ { @@ -179,8 +213,12 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) case BrokerType: cluster.brokersLock.RLock() defer cluster.brokersLock.RUnlock() - for _, node := range cluster.brokers { - nodes = append(nodes, node) + for _, dcNodes := range cluster.brokers { + for _, rackNodes := range dcNodes.brokers { + for _, node := range rackNodes.brokers { + nodes = append(nodes, node) + } + } } case MasterType: } |
