aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/server.go34
1 files changed, 26 insertions, 8 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 02641bbe2..0ad126dbb 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -24,14 +24,15 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
- sftpOptions SftpOptions
- iamOptions IamOptions
- webdavOptions WebDavOption
- mqBrokerOptions MessageQueueBrokerOptions
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ sftpOptions SftpOptions
+ iamOptions IamOptions
+ webdavOptions WebDavOption
+ mqBrokerOptions MessageQueueBrokerOptions
+ mqAgentServerOptions MessageQueueAgentOptions
)
func init() {
@@ -78,6 +79,7 @@ var (
isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service")
isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway")
isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker")
+ isStartingMqAgent = cmdServer.Flag.Bool("mq.agent", false, "whether to start message queue agent")
False = false
)
@@ -191,6 +193,9 @@ func init() {
mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port")
+ mqAgentServerOptions.brokersString = cmdServer.Flag.String("mq.agent.brokers", "localhost:17777", "comma-separated message queue brokers")
+ mqAgentServerOptions.port = cmdServer.Flag.Int("mq.agent.port", 16777, "message queue agent gRPC listen port")
+
}
func runServer(cmd *Command, args []string) bool {
@@ -219,6 +224,10 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingMqBroker {
*isStartingFiler = true
}
+ if *isStartingMqAgent {
+ *isStartingMqBroker = true
+ *isStartingFiler = true
+ }
if *isStartingMasterServer {
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers)
@@ -258,6 +267,8 @@ func runServer(cmd *Command, args []string) bool {
mqBrokerOptions.ip = serverIp
mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap()
mqBrokerOptions.filerGroup = filerOptions.filerGroup
+ mqAgentServerOptions.ip = serverIp
+ mqAgentServerOptions.brokers = pb.ServerAddresses(*mqAgentServerOptions.brokersString).ToAddresses()
// serverOptions.v.pulseSeconds = pulseSeconds
// masterOptions.pulseSeconds = pulseSeconds
@@ -346,6 +357,13 @@ func runServer(cmd *Command, args []string) bool {
}()
}
+ if *isStartingMqAgent {
+ go func() {
+ time.Sleep(2 * time.Second)
+ mqAgentServerOptions.startQueueAgent()
+ }()
+ }
+
// start volume server
if *isStartingVolumeServer {
minFreeSpaces := util.MustParseMinFreeSpace(*volumeMinFreeSpace, *volumeMinFreeSpacePercent)