aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/command.go4
-rw-r--r--weed/command/mq_broker.go94
-rw-r--r--weed/command/msg_broker.go109
-rw-r--r--weed/command/server.go32
5 files changed, 115 insertions, 126 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 9f18cc5b9..d600e32b5 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/command.go b/weed/command/command.go
index 7635405dc..512cd6f8f 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -28,12 +28,12 @@ var Commands = []*Command{
cmdFilerSynchronize,
cmdFix,
cmdFuse,
+ cmdIam,
cmdMaster,
cmdMasterFollower,
cmdMount,
+ cmdMqBroker,
cmdS3,
- cmdIam,
- cmdMsgBroker,
cmdScaffold,
cmdServer,
cmdShell,
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go
new file mode 100644
index 000000000..1b31d0141
--- /dev/null
+++ b/weed/command/mq_broker.go
@@ -0,0 +1,94 @@
+package command
+
+import (
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ mqBrokerStandaloneOptions MessageQueueBrokerOptions
+)
+
+type MessageQueueBrokerOptions struct {
+ masters map[string]pb.ServerAddress
+ mastersString *string
+ filerGroup *string
+ ip *string
+ port *int
+ dataCenter *string
+ rack *string
+ cpuprofile *string
+ memprofile *string
+}
+
+func init() {
+ cmdMqBroker.Run = runMqBroker // break init cycle
+ mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
+ mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
+ mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack")
+ mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
+ mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdMqBroker = &Command{
+ UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]",
+ Short: "start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMqBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap()
+
+ return mqBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
+
+ grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
+
+ qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{
+ Masters: mqBrokerOpt.masters,
+ FilerGroup: *mqBrokerOpt.filerGroup,
+ DataCenter: *mqBrokerOpt.dataCenter,
+ Rack: *mqBrokerOpt.rack,
+ DefaultReplication: "",
+ MaxMB: 0,
+ Ip: *mqBrokerOpt.ip,
+ Port: *mqBrokerOpt.port,
+ }, grpcDialOption)
+
+ // start grpc listener
+ grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ mq_pb.RegisterSeaweedMessagingServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
deleted file mode 100644
index 3274f599b..000000000
--- a/weed/command/msg_broker.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package command
-
-import (
- "context"
- "fmt"
- "time"
-
- "google.golang.org/grpc/reflection"
-
- "github.com/chrislusf/seaweedfs/weed/util/grace"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- messageBrokerStandaloneOptions MessageBrokerOptions
-)
-
-type MessageBrokerOptions struct {
- filer *string
- ip *string
- port *int
- cpuprofile *string
- memprofile *string
-}
-
-func init() {
- cmdMsgBroker.Run = runMsgBroker // break init cycle
- messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
- messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
- messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
- messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
- messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
-}
-
-var cmdMsgBroker = &Command{
- UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]",
- Short: "start a message queue broker",
- Long: `start a message queue broker
-
- The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
- The brokers are stateless. To scale up, just add more brokers.
-
-`,
-}
-
-func runMsgBroker(cmd *Command, args []string) bool {
-
- util.LoadConfiguration("security", false)
-
- return messageBrokerStandaloneOptions.startQueueServer()
-
-}
-
-func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
-
- grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
-
- filerAddress := pb.ServerAddress(*msgBrokerOpt.filer)
-
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
- cipher := false
-
- for {
- err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
- }
- cipher = resp.Cipher
- return nil
- })
- if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
- time.Sleep(time.Second)
- } else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
- break
- }
- }
-
- qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
- Filers: []pb.ServerAddress{filerAddress},
- DefaultReplication: "",
- MaxMB: 0,
- Ip: *msgBrokerOpt.ip,
- Port: *msgBrokerOpt.port,
- Cipher: cipher,
- }, grpcDialOption)
-
- // start grpc listener
- grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0)
- if err != nil {
- glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
- }
- grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
- messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
- reflection.Register(grpcS)
- grpcS.Serve(grpcL)
-
- return true
-
-}
diff --git a/weed/command/server.go b/weed/command/server.go
index b1812bb9b..b993d9428 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -24,13 +24,13 @@ type ServerOptions struct {
}
var (
- serverOptions ServerOptions
- masterOptions MasterOptions
- filerOptions FilerOptions
- s3Options S3Options
- iamOptions IamOptions
- webdavOptions WebDavOption
- msgBrokerOptions MessageBrokerOptions
+ serverOptions ServerOptions
+ masterOptions MasterOptions
+ filerOptions FilerOptions
+ s3Options S3Options
+ iamOptions IamOptions
+ webdavOptions WebDavOption
+ mqBrokerOptions MessageQueueBrokerOptions
)
func init() {
@@ -74,7 +74,7 @@ var (
isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service")
isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway")
- isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
+ isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker")
serverWhiteList []string
@@ -155,7 +155,7 @@ func init() {
webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
- msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port")
+ mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port")
}
@@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool {
if *isStartingWebDav {
*isStartingFiler = true
}
- if *isStartingMsgBroker {
+ if *isStartingMqBroker {
*isStartingFiler = true
}
@@ -208,7 +208,9 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
- msgBrokerOptions.ip = serverIp
+ mqBrokerOptions.ip = serverIp
+ mqBrokerOptions.masters = filerOptions.masters
+ mqBrokerOptions.filerGroup = filerOptions.filerGroup
// serverOptions.v.pulseSeconds = pulseSeconds
// masterOptions.pulseSeconds = pulseSeconds
@@ -217,6 +219,8 @@ func runServer(cmd *Command, args []string) bool {
filerOptions.dataCenter = serverDataCenter
filerOptions.rack = serverRack
+ mqBrokerOptions.dataCenter = serverDataCenter
+ mqBrokerOptions.rack = serverRack
filerOptions.disableHttp = serverDisableHttp
masterOptions.disableHttp = serverDisableHttp
@@ -224,7 +228,7 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress
iamOptions.filer = &filerAddress
webdavOptions.filer = &filerAddress
- msgBrokerOptions.filer = &filerAddress
+ mqBrokerOptions.filerGroup = filerOptions.filerGroup
go stats_collect.StartMetricsServer(*serverMetricsHttpPort)
@@ -276,10 +280,10 @@ func runServer(cmd *Command, args []string) bool {
}()
}
- if *isStartingMsgBroker {
+ if *isStartingMqBroker {
go func() {
time.Sleep(2 * time.Second)
- msgBrokerOptions.startQueueServer()
+ mqBrokerOptions.startQueueServer()
}()
}