aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/msg_broker.go10
-rw-r--r--weed/command/server.go24
2 files changed, 25 insertions, 9 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index bab1083ab..b4b5855ff 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -20,10 +20,10 @@ import (
)
var (
- messageBrokerStandaloneOptions QueueOptions
+ messageBrokerStandaloneOptions MessageBrokerOptions
)
-type QueueOptions struct {
+type MessageBrokerOptions struct {
filer *string
ip *string
port *int
@@ -41,8 +41,8 @@ func init() {
}
var cmdMsgBroker = &Command{
- UsageLine: "msg.broker [-port=17777] [-filer=<ip:port>]",
- Short: "<WIP> start a message queue broker",
+ UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
+ Short: "start a message queue broker",
Long: `start a message queue broker
The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
@@ -59,7 +59,7 @@ func runMsgBroker(cmd *Command, args []string) bool {
}
-func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
+func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
diff --git a/weed/command/server.go b/weed/command/server.go
index 0d54004f8..c006f00eb 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -18,10 +18,11 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ msgBrokerOptions MessageBrokerOptions
)
func init() {
@@ -57,6 +58,7 @@ var (
pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
)
@@ -98,6 +100,8 @@ func init() {
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
+
}
func runServer(cmd *Command, args []string) bool {
@@ -117,6 +121,9 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingS3 {
*isStartingFiler = true
}
+ if *isStartingMsgBroker {
+ *isStartingFiler = true
+ }
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
peers := strings.Join(peerList, ",")
@@ -133,6 +140,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
+ msgBrokerOptions.ip = serverIp
serverOptions.v.pulseSeconds = pulseSeconds
masterOptions.pulseSeconds = pulseSeconds
@@ -145,6 +153,7 @@ func runServer(cmd *Command, args []string) bool {
filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
s3Options.filer = &filerAddress
+ msgBrokerOptions.filer = &filerAddress
if *filerOptions.defaultReplicaPlacement == "" {
*filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
@@ -188,6 +197,13 @@ func runServer(cmd *Command, args []string) bool {
}()
}
+ if *isStartingMsgBroker {
+ go func() {
+ time.Sleep(2 * time.Second)
+ msgBrokerOptions.startQueueServer()
+ }()
+ }
+
// start volume server
{
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)