aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_server.go')
-rw-r--r--weed/mq/broker/broker_server.go22
1 files changed, 22 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 90c6179cb..db8329989 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,6 +1,8 @@
package broker
import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
@@ -36,6 +38,7 @@ type MessageQueueBroker struct {
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
Balancer *balancer.Balancer
+ lockAsBalancer *cluster.LiveLock
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@@ -57,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker.OnBrokerUpdate(newNode, time.Now())
}
+ // keep connecting to balancer
+ go func() {
+ for mqBroker.currentFiler == "" {
+ time.Sleep(time.Millisecond * 237)
+ }
+ self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
+ glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+
+ lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
+ mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self)
+ for {
+ err := mqBroker.BrokerConnectToBalancer(self)
+ if err != nil {
+ fmt.Printf("BrokerConnectToBalancer: %v\n", err)
+ }
+ time.Sleep(time.Second)
+ }
+ }()
+
return mqBroker, nil
}