diff options
Diffstat (limited to 'weed/mq/broker/broker_server.go')
| -rw-r--r-- | weed/mq/broker/broker_server.go | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 90c6179cb..db8329989 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,6 +1,8 @@ package broker import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "time" @@ -36,6 +38,7 @@ type MessageQueueBroker struct { currentFiler pb.ServerAddress localTopicManager *topic.LocalTopicManager Balancer *balancer.Balancer + lockAsBalancer *cluster.LiveLock } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { @@ -57,6 +60,25 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker.OnBrokerUpdate(newNode, time.Now()) } + // keep connecting to balancer + go func() { + for mqBroker.currentFiler == "" { + time.Sleep(time.Millisecond * 237) + } + self := fmt.Sprintf("%s:%d", option.Ip, option.Port) + glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler) + + lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler) + mqBroker.lockAsBalancer = lockClient.StartLock(LockBrokerBalancer, self) + for { + err := mqBroker.BrokerConnectToBalancer(self) + if err != nil { + fmt.Printf("BrokerConnectToBalancer: %v\n", err) + } + time.Sleep(time.Second) + } + }() + return mqBroker, nil } |
