diff options
Diffstat (limited to 'weed/mq/broker/broker_server.go')
| -rw-r--r-- | weed/mq/broker/broker_server.go | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 3fd01fb53..dbd854250 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -2,7 +2,9 @@ package broker import ( "context" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" "time" "google.golang.org/grpc" @@ -14,6 +16,8 @@ import ( ) type MessageQueueBrokerOption struct { + Masters map[string]pb.ServerAddress + FilerGroup string Filers []pb.ServerAddress DefaultReplication string MaxMB int @@ -26,23 +30,26 @@ type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer option *MessageQueueBrokerOption grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient topicManager *TopicManager } -func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { - messageBroker = &MessageQueueBroker{ + mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), "", option.Masters), } - messageBroker.topicManager = NewTopicManager(messageBroker) + mqBroker.topicManager = NewTopicManager(mqBroker) - messageBroker.checkFilers() + mqBroker.checkFilers() - go messageBroker.keepConnectedToOneFiler() + go mqBroker.keepConnectedToOneFiler() + go mqBroker.MasterClient.KeepConnectedToMaster() - return messageBroker, nil + return mqBroker, nil } func (broker *MessageQueueBroker) keepConnectedToOneFiler() { |
