aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_server.go')
-rw-r--r--weed/mq/broker/broker_server.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index f41ec87ca..a009af693 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,7 +1,7 @@
package broker
import (
- "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@@ -88,11 +88,13 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, string(self))
for {
- err := mqBroker.BrokerConnectToBalancer(string(self))
- if err != nil {
- fmt.Printf("BrokerConnectToBalancer: %v\n", err)
+ if err := mqBroker.BrokerConnectToBalancer(string(self)); err != nil {
+ glog.V(0).Infof("BrokerConnectToBalancer: %v", err)
}
time.Sleep(time.Second)
+ if err := mqBroker.lockAsBalancer.DoLock(lock_manager.MaxDuration); err != nil {
+ glog.V(0).Infof("DoLock: %v", err)
+ }
}
}()