diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_server_discovery.go | 4 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_server_publish.go | 2 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 19 |
4 files changed, 19 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go index 9aa9b1908..2cb4187ae 100644 --- a/weed/mq/broker/broker_grpc_server.go +++ b/weed/mq/broker/broker_grpc_server.go @@ -9,11 +9,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { +func (broker *MessageQueueBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) { panic("implement me") } -func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { +func (broker *MessageQueueBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) { resp := &mq_pb.DeleteTopicResponse{} dir, entry := genTopicDirEntry(request.Namespace, request.Topic) if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { @@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.Delet return resp, nil } -func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { +func (broker *MessageQueueBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) { panic("implement me") } diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go index 0c8d70e68..e276091a9 100644 --- a/weed/mq/broker/broker_grpc_server_discovery.go +++ b/weed/mq/broker/broker_grpc_server_discovery.go @@ -26,7 +26,7 @@ If one of the pub or sub connects very late, and the system topo changed quite a */ -func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) { +func (broker *MessageQueueBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) { t := &mq_pb.FindBrokerResponse{} var peers []string @@ -61,7 +61,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBr } -func (broker *MessageBroker) checkFilers() { +func (broker *MessageQueueBroker) checkFilers() { // contact a filer about masters var masters []pb.ServerAddress diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go index 4ff9ad809..eb76dd5dc 100644 --- a/weed/mq/broker/broker_grpc_server_publish.go +++ b/weed/mq/broker/broker_grpc_server_publish.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" ) -func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { +func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { // process initial request in, err := stream.Recv() diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 3fd01fb53..dbd854250 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -2,7 +2,9 @@ package broker import ( "context" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/chrislusf/seaweedfs/weed/wdclient" "time" "google.golang.org/grpc" @@ -14,6 +16,8 @@ import ( ) type MessageQueueBrokerOption struct { + Masters map[string]pb.ServerAddress + FilerGroup string Filers []pb.ServerAddress DefaultReplication string MaxMB int @@ -26,23 +30,26 @@ type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer option *MessageQueueBrokerOption grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient topicManager *TopicManager } -func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) { +func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { - messageBroker = &MessageQueueBroker{ + mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), "", option.Masters), } - messageBroker.topicManager = NewTopicManager(messageBroker) + mqBroker.topicManager = NewTopicManager(mqBroker) - messageBroker.checkFilers() + mqBroker.checkFilers() - go messageBroker.keepConnectedToOneFiler() + go mqBroker.keepConnectedToOneFiler() + go mqBroker.MasterClient.KeepConnectedToMaster() - return messageBroker, nil + return mqBroker, nil } func (broker *MessageQueueBroker) keepConnectedToOneFiler() { |
