diff options
Diffstat (limited to 'weed/mq/broker/broker_server.go')
| -rw-r--r-- | weed/mq/broker/broker_server.go | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index f41ec87ca..a009af693 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,7 +1,7 @@ package broker import ( - "fmt" + "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator" @@ -88,11 +88,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self)) for { - err := mqBroker.BrokerConnectToBalancer(string(self)) - if err != nil { - fmt.Printf("BrokerConnectToBalancer: %v\n", err) + if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil { + glog.V(0).Infof("BrokerConnectToBalancer: %v", err) } time.Sleep(time.Second) + if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil { + glog.V(0).Infof("DoLock: %v", err) + } } }() |
