aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-09 12:56:49 -0800
committerchrislu <chris.lu@gmail.com>2024-03-09 12:56:49 -0800
commite7823ee967e0a199081605e156a84c785e9839b1 (patch)
tree26c0a3460b97661e1496d0aa0b6cbbf48da33024
parent3b28433cb1ce5cc490776cce56fb54b4c1f8ffe3 (diff)
downloadseaweedfs-e7823ee967e0a199081605e156a84c785e9839b1.tar.xz
seaweedfs-e7823ee967e0a199081605e156a84c785e9839b1.zip
retry connecting to broker leader
-rw-r--r--weed/mq/broker/broker_connect.go47
-rw-r--r--weed/mq/broker/broker_server.go15
2 files changed, 48 insertions, 14 deletions
diff --git a/weed/mq/broker/broker_connect.go b/weed/mq/broker/broker_connect.go
index 602461c82..859e330a7 100644
--- a/weed/mq/broker/broker_connect.go
+++ b/weed/mq/broker/broker_connect.go
@@ -12,7 +12,8 @@ import (
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
-func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) error {
+func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stopCh chan struct{}) error {
+
self := string(b.option.BrokerAddress())
glog.V(0).Infof("broker %s connects to balancer %s", self, brokerBalancer)
@@ -39,6 +40,13 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro
}
for {
+ // check if the broker is stopping
+ select {
+ case <-stopCh:
+ return nil
+ default:
+ }
+
stats := b.localTopicManager.CollectStats(time.Second * 5)
err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
@@ -55,7 +63,40 @@ func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string) erro
time.Sleep(time.Millisecond*5000 + time.Duration(rand.Intn(1000))*time.Millisecond)
}
-
- return nil
})
}
+
+func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh chan string) {
+ var stopPrevRunChan chan struct{}
+ for {
+ select {
+ case newBrokerBalancer := <-newBrokerBalancerCh:
+ if stopPrevRunChan != nil {
+ close(stopPrevRunChan)
+ stopPrevRunChan = nil
+ }
+ thisRunStopChan := make(chan struct{})
+ if newBrokerBalancer != "" {
+ stopPrevRunChan = thisRunStopChan
+ go func() {
+ for {
+ err := b.BrokerConnectToBalancer(newBrokerBalancer, thisRunStopChan)
+ if err != nil {
+ glog.V(0).Infof("connect to balancer %s: %v", newBrokerBalancer, err)
+ time.Sleep(time.Second)
+ } else {
+ break
+ }
+
+ select {
+ case <-thisRunStopChan:
+ return
+ default:
+ }
+
+ }
+ }()
+ }
+ }
+ }
+}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 492d088e2..9c321744b 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,7 +1,6 @@
package broker
import (
- "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@@ -84,19 +83,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
self := option.BrokerAddress()
glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+ newBrokerBalancerCh := make(chan string, 1)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
- // FIXME this is a blocking call, should be in a goroutine
- if err := mqBroker.BrokerConnectToBalancer(newLockOwner); err != nil {
- glog.V(0).Infof("BrokerConnectToBalancer %s: %v", newLockOwner, err)
- }
+ glog.V(0).Infof("broker %s found balanacer %s", self, newLockOwner)
+ newBrokerBalancerCh <- newLockOwner
})
- for {
- time.Sleep(lock_manager.RenewInterval)
- if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
- glog.V(4).Infof("AttemptToLock: %v", err)
- }
- }
+ mqBroker.KeepConnectedToBrokerBalancer(newBrokerBalancerCh)
}()
return mqBroker, nil