aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-11-09 08:50:55 -0800
committerChris Lu <chris.lu@gmail.com>2021-11-09 08:50:55 -0800
commit1d4a61af5e34726321c6f7c9e659d0660293edb7 (patch)
treec7ff677a96723fbed58c9f67be8b21d2ad209199
parent59d1435d652812a8505e8924ebafc7ccab437cb5 (diff)
downloadseaweedfs-1d4a61af5e34726321c6f7c9e659d0660293edb7.tar.xz
seaweedfs-1d4a61af5e34726321c6f7c9e659d0660293edb7.zip
add brokers
-rw-r--r--weed/cluster/cluster.go51
1 files changed, 51 insertions, 0 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index 08ff97198..fa01bd03d 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -29,12 +29,15 @@ type Cluster struct {
filers map[pb.ServerAddress]*ClusterNode
filersLock sync.RWMutex
filerLeaders *Leaders
+ brokers map[pb.ServerAddress]*ClusterNode
+ brokersLock sync.RWMutex
}
func NewCluster() *Cluster {
return &Cluster{
filers: make(map[pb.ServerAddress]*ClusterNode),
filerLeaders: &Leaders{},
+ brokers: make(map[pb.ServerAddress]*ClusterNode),
}
}
@@ -54,6 +57,28 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
createdTs: time.Now(),
}
return cluster.ensureFilerLeaders(true, nodeType, address)
+ case BrokerType:
+ cluster.brokersLock.Lock()
+ defer cluster.brokersLock.Unlock()
+ if existingNode, found := cluster.brokers[address]; found {
+ existingNode.counter++
+ return nil
+ }
+ cluster.brokers[address] = &ClusterNode{
+ Address: address,
+ Version: version,
+ counter: 1,
+ createdTs: time.Now(),
+ }
+ return []*master_pb.KeepConnectedResponse{
+ {
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: true,
+ },
+ },
+ }
case MasterType:
}
return nil
@@ -73,6 +98,26 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr
return cluster.ensureFilerLeaders(false, nodeType, address)
}
}
+ case BrokerType:
+ cluster.brokersLock.Lock()
+ defer cluster.brokersLock.Unlock()
+ if existingNode, found := cluster.brokers[address]; !found {
+ return nil
+ } else {
+ existingNode.counter--
+ if existingNode.counter <= 0 {
+ delete(cluster.brokers, address)
+ return []*master_pb.KeepConnectedResponse{
+ {
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: false,
+ },
+ },
+ }
+ }
+ }
case MasterType:
}
return nil
@@ -86,6 +131,12 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode)
for _, node := range cluster.filers {
nodes = append(nodes, node)
}
+ case BrokerType:
+ cluster.brokersLock.RLock()
+ defer cluster.brokersLock.RUnlock()
+ for _, node := range cluster.brokers {
+ nodes = append(nodes, node)
+ }
case MasterType:
}
return