aboutsummaryrefslogtreecommitdiff
path: root/weed/command/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/server.go')
-rw-r--r--weed/command/server.go24
1 files changed, 20 insertions, 4 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 0d54004f8..c006f00eb 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -18,10 +18,11 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ msgBrokerOptions MessageBrokerOptions
)
func init() {
@@ -57,6 +58,7 @@ var (
pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
)
@@ -98,6 +100,8 @@ func init() {
s3Options.tlsCertificate = cmdServer.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
s3Options.config = cmdServer.Flag.String("s3.config", "", "path to the config file")
+ msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
+
}
func runServer(cmd *Command, args []string) bool {
@@ -117,6 +121,9 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingS3 {
*isStartingFiler = true
}
+ if *isStartingMsgBroker {
+ *isStartingFiler = true
+ }
_, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
peers := strings.Join(peerList, ",")
@@ -133,6 +140,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
+ msgBrokerOptions.ip = serverIp
serverOptions.v.pulseSeconds = pulseSeconds
masterOptions.pulseSeconds = pulseSeconds
@@ -145,6 +153,7 @@ func runServer(cmd *Command, args []string) bool {
filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
s3Options.filer = &filerAddress
+ msgBrokerOptions.filer = &filerAddress
if *filerOptions.defaultReplicaPlacement == "" {
*filerOptions.defaultReplicaPlacement = *masterOptions.defaultReplication
@@ -188,6 +197,13 @@ func runServer(cmd *Command, args []string) bool {
}()
}
+ if *isStartingMsgBroker {
+ go func() {
+ time.Sleep(2 * time.Second)
+ msgBrokerOptions.startQueueServer()
+ }()
+ }
+
// start volume server
{
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)