aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/command.go2
-rw-r--r--weed/command/mq_broker.go (renamed from weed/command/msg_broker.go)42
-rw-r--r--weed/command/server.go28
3 files changed, 36 insertions, 36 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 7635405dc..abd1b63e9 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -33,7 +33,7 @@ var Commands = []*Command{
cmdMount,
cmdS3,
cmdIam,
- cmdMsgBroker,
+ cmdMqBroker,
cmdScaffold,
cmdServer,
cmdShell,
diff --git a/weed/command/msg_broker.go b/weed/command/mq_broker.go
index 3274f599b..a5a6e3566 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/mq_broker.go
@@ -19,10 +19,10 @@ import (
)
var (
- messageBrokerStandaloneOptions MessageBrokerOptions
+ mqBrokerStandaloneOptions MessageQueueBrokerOptions
)
-type MessageBrokerOptions struct {
+type MessageQueueBrokerOptions struct {
filer *string
ip *string
port *int
@@ -31,16 +31,16 @@ type MessageBrokerOptions struct {
}
func init() {
- cmdMsgBroker.Run = runMsgBroker // break init cycle
- messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
- messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
- messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
- messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
- messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
+ cmdMqBroker.Run = runMqBroker // break init cycle
+ mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
+ mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
}
-var cmdMsgBroker = &Command{
- UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
+var cmdMqBroker = &Command{
+ UsageLine: "mq.broker [-port=17777] [-filer=<ip:port>]",
Short: "start a message queue broker",
Long: `start a message queue broker
@@ -50,19 +50,19 @@ var cmdMsgBroker = &Command{
`,
}
-func runMsgBroker(cmd *Command, args []string) bool {
+func runMqBroker(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
- return messageBrokerStandaloneOptions.startQueueServer()
+ return mqBrokerStandaloneOptions.startQueueServer()
}
-func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
+func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
- grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
+ grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
- filerAddress := pb.ServerAddress(*msgBrokerOpt.filer)
+ filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
cipher := false
@@ -77,10 +77,10 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
+ glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
break
}
}
@@ -89,15 +89,15 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "",
MaxMB: 0,
- Ip: *msgBrokerOpt.ip,
- Port: *msgBrokerOpt.port,
+ Ip: *mqBrokerOpt.ip,
+ Port: *mqBrokerOpt.port,
Cipher: cipher,
}, grpcDialOption)
// start grpc listener
- grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0)
+ grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
if err != nil {
- glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
+ glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
diff --git a/weed/command/server.go b/weed/command/server.go
index b1812bb9b..7c14fd14f 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -24,13 +24,13 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
- iamOptions IamOptions
- webdavOptions WebDavOption
- msgBrokerOptions MessageBrokerOptions
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ iamOptions IamOptions
+ webdavOptions WebDavOption
+ mqBrokerOptions MessageQueueBrokerOptions
)
func init() {
@@ -74,7 +74,7 @@ var (
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service")
isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway")
- isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
+ isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker")
serverWhiteList []string
@@ -155,7 +155,7 @@ func init() {
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
- msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
+ mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port")
}
@@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingWebDav {
*isStartingFiler = true
}
- if *isStartingMsgBroker {
+ if *isStartingMqBroker {
*isStartingFiler = true
}
@@ -208,7 +208,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
- msgBrokerOptions.ip = serverIp
+ mqBrokerOptions.ip = serverIp
// serverOptions.v.pulseSeconds = pulseSeconds
// masterOptions.pulseSeconds = pulseSeconds
@@ -224,7 +224,7 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
iamOptions.filer = &filerAddress
webdavOptions.filer = &filerAddress
- msgBrokerOptions.filer = &filerAddress
+ mqBrokerOptions.filer = &filerAddress
go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
@@ -276,10 +276,10 @@ func runServer(cmd *Command, args []string) bool {
}()
}
- if *isStartingMsgBroker {
+ if *isStartingMqBroker {
go func() {
time.Sleep(2 * time.Second)
- msgBrokerOptions.startQueueServer()
+ mqBrokerOptions.startQueueServer()
}()
}