aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/cluster.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/cluster.go')
-rw-r--r--weed/cluster/cluster.go82
1 files changed, 60 insertions, 22 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index ad6e6b879..1802ecda0 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -24,6 +24,15 @@ type Leaders struct {
leaders [3]pb.ServerAddress
}
+type DataCenter string
+type Rack string
+type DataCenterBrokers struct {
+ brokers map[Rack]*RackBrokers
+}
+type RackBrokers struct {
+ brokers map[pb.ServerAddress]*ClusterNode
+}
+
type ClusterNode struct {
Address pb.ServerAddress
Version string
@@ -34,14 +43,14 @@ type ClusterNode struct {
type Cluster struct {
filerGroup2filers map[FilerGroup]*Filers
filersLock sync.RWMutex
- brokers map[pb.ServerAddress]*ClusterNode
+ brokers map[DataCenter]*DataCenterBrokers
brokersLock sync.RWMutex
}
func NewCluster() *Cluster {
return &Cluster{
filerGroup2filers: make(map[FilerGroup]*Filers),
- brokers: make(map[pb.ServerAddress]*ClusterNode),
+ brokers: make(map[DataCenter]*DataCenterBrokers),
}
}
@@ -57,7 +66,7 @@ func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool)
return filers
}
-func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
+func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
@@ -78,11 +87,24 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
- if existingNode, found := cluster.brokers[address]; found {
- existingNode.counter++
+ existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
+ if !foundDataCenter {
+ existingDataCenterBrokers = &DataCenterBrokers{
+ brokers: make(map[Rack]*RackBrokers),
+ }
+ }
+ existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack]
+ if !foundRack {
+ existingRackBrokers = &RackBrokers{
+ brokers: make(map[pb.ServerAddress]*ClusterNode),
+ }
+ }
+
+ if existingBroker, found := existingRackBrokers.brokers[address]; found {
+ existingBroker.counter++
return nil
}
- cluster.brokers[address] = &ClusterNode{
+ existingRackBrokers.brokers[address] = &ClusterNode{
Address: address,
Version: version,
counter: 1,
@@ -111,7 +133,7 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, address pb.ServerAdd
return nil
}
-func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
+func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
filerGroup := FilerGroup(ns)
switch nodeType {
case FilerType:
@@ -133,23 +155,35 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb
case BrokerType:
cluster.brokersLock.Lock()
defer cluster.brokersLock.Unlock()
- if existingNode, found := cluster.brokers[address]; !found {
+
+ existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter]
+ if !foundDataCenter {
return nil
- } else {
- existingNode.counter--
- if existingNode.counter <= 0 {
- delete(cluster.brokers, address)
- return []*master_pb.KeepConnectedResponse{
- {
- ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
- NodeType: nodeType,
- Address: string(address),
- IsAdd: false,
- },
+ }
+ existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)]
+ if !foundRack {
+ return nil
+ }
+
+ existingBroker, found := existingRackBrokers.brokers[address]
+ if !found {
+ return nil
+ }
+ existingBroker.counter--
+ if existingBroker.counter <= 0 {
+ delete(existingRackBrokers.brokers, address)
+ return []*master_pb.KeepConnectedResponse{
+ {
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: false,
},
- }
+ },
}
}
+ return nil
+
case MasterType:
return []*master_pb.KeepConnectedResponse{
{
@@ -179,8 +213,12 @@ func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string)
case BrokerType:
cluster.brokersLock.RLock()
defer cluster.brokersLock.RUnlock()
- for _, node := range cluster.brokers {
- nodes = append(nodes, node)
+ for _, dcNodes := range cluster.brokers {
+ for _, rackNodes := range dcNodes.brokers {
+ for _, node := range rackNodes.brokers {
+ nodes = append(nodes, node)
+ }
+ }
}
case MasterType:
}