aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_server.go6
-rw-r--r--weed/mq/broker/broker_grpc_server_discovery.go4
-rw-r--r--weed/mq/broker/broker_grpc_server_publish.go2
-rw-r--r--weed/mq/broker/broker_server.go19
4 files changed, 19 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go
index 9aa9b1908..2cb4187ae 100644
--- a/weed/mq/broker/broker_grpc_server.go
+++ b/weed/mq/broker/broker_grpc_server.go
@@ -9,11 +9,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
+func (broker *MessageQueueBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
panic("implement me")
}
-func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
+func (broker *MessageQueueBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
resp := &mq_pb.DeleteTopicResponse{}
dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
@@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.Delet
return resp, nil
}
-func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
+func (broker *MessageQueueBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}
diff --git a/weed/mq/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go
index 0c8d70e68..e276091a9 100644
--- a/weed/mq/broker/broker_grpc_server_discovery.go
+++ b/weed/mq/broker/broker_grpc_server_discovery.go
@@ -26,7 +26,7 @@ If one of the pub or sub connects very late, and the system topo changed quite a
*/
-func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
+func (broker *MessageQueueBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
t := &mq_pb.FindBrokerResponse{}
var peers []string
@@ -61,7 +61,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBr
}
-func (broker *MessageBroker) checkFilers() {
+func (broker *MessageQueueBroker) checkFilers() {
// contact a filer about masters
var masters []pb.ServerAddress
diff --git a/weed/mq/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go
index 4ff9ad809..eb76dd5dc 100644
--- a/weed/mq/broker/broker_grpc_server_publish.go
+++ b/weed/mq/broker/broker_grpc_server_publish.go
@@ -13,7 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
// process initial request
in, err := stream.Recv()
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 3fd01fb53..dbd854250 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -2,7 +2,9 @@ package broker
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"time"
"google.golang.org/grpc"
@@ -14,6 +16,8 @@ import (
)
type MessageQueueBrokerOption struct {
+ Masters map[string]pb.ServerAddress
+ FilerGroup string
Filers []pb.ServerAddress
DefaultReplication string
MaxMB int
@@ -26,23 +30,26 @@ type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption
+ MasterClient *wdclient.MasterClient
topicManager *TopicManager
}
-func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) {
+func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
- messageBroker = &MessageQueueBroker{
+ mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), "", option.Masters),
}
- messageBroker.topicManager = NewTopicManager(messageBroker)
+ mqBroker.topicManager = NewTopicManager(mqBroker)
- messageBroker.checkFilers()
+ mqBroker.checkFilers()
- go messageBroker.keepConnectedToOneFiler()
+ go mqBroker.keepConnectedToOneFiler()
+ go mqBroker.MasterClient.KeepConnectedToMaster()
- return messageBroker, nil
+ return mqBroker, nil
}
func (broker *MessageQueueBroker) keepConnectedToOneFiler() {