diff options
Diffstat (limited to 'weed/mq/broker/broker_server.go')
| -rw-r--r-- | weed/mq/broker/broker_server.go | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 3a25a9691..3a18c3971 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -86,11 +86,14 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler) lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) - mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self)) - for { - if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil { - glog.V(0).Infof("BrokerConnectToBalancer: %v", err) + mqBroker.lockAsBalancer = lockClient.StartLongLivedLock(pub_balancer.LockBrokerBalancer, string(self), func(newLockOwner string) { + balancer := mqBroker.lockAsBalancer.LockOwner() + if err := mqBroker.BrokerConnectToBalancer(balancer); err != nil { + glog.V(0).Infof("BrokerConnectToBalancer %s: %v", balancer, err) } + }) + for { + time.Sleep(time.Second) if err := mqBroker.lockAsBalancer.AttemptToLock(lock_manager.MaxDuration); err != nil { glog.V(0).Infof("AttemptToLock: %v", err) |
