aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_server.go')
-rw-r--r--weed/mq/broker/broker_server.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 3a25a9691..3a18c3971 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -86,11 +86,14 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
- mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self))
- for {
- if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil {
- glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
+ mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) {
+ balancer := mqBroker.lockAsBalancer.LockOwner()
+ if err := mqBroker.BrokerConnectToBalancer(balancer); err != nil {
+ glog.V(0).Infof("BrokerConnectToBalancer %s: %v", balancer, err)
}
+ })
+ for {
+
time.Sleep(time.Second)
if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil {
glog.V(0).Infof("AttemptToLock: %v", err)