diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/benchmark.go | 2 | ||||
| -rw-r--r-- | weed/command/command.go | 4 | ||||
| -rw-r--r-- | weed/command/mq_broker.go | 94 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 109 | ||||
| -rw-r--r-- | weed/command/server.go | 32 |
5 files changed, 115 insertions, 126 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 9f18cc5b9..d600e32b5 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/command.go b/weed/command/command.go index 7635405dc..512cd6f8f 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -28,12 +28,12 @@ var Commands = []*Command{ cmdFilerSynchronize, cmdFix, cmdFuse, + cmdIam, cmdMaster, cmdMasterFollower, cmdMount, + cmdMqBroker, cmdS3, - cmdIam, - cmdMsgBroker, cmdScaffold, cmdServer, cmdShell, diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go new file mode 100644 index 000000000..1b31d0141 --- /dev/null +++ b/weed/command/mq_broker.go @@ -0,0 +1,94 @@ +package command + +import ( + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + mqBrokerStandaloneOptions MessageQueueBrokerOptions +) + +type MessageQueueBrokerOptions struct { + masters map[string]pb.ServerAddress + mastersString *string + filerGroup *string + ip *string + port *int + dataCenter *string + rack *string + cpuprofile *string + memprofile *string +} + +func init() { + cmdMqBroker.Run = runMqBroker // break init cycle + mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") + mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") + mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") + mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") + mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") + mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack") + mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") + mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") +} + +var cmdMqBroker = &Command{ + UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]", + Short: "start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMqBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap() + + return mqBrokerStandaloneOptions.startQueueServer() + +} + +func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { + + grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + + qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ + Masters: mqBrokerOpt.masters, + FilerGroup: *mqBrokerOpt.filerGroup, + DataCenter: *mqBrokerOpt.dataCenter, + Rack: *mqBrokerOpt.rack, + DefaultReplication: "", + MaxMB: 0, + Ip: *mqBrokerOpt.ip, + Port: *mqBrokerOpt.port, + }, grpcDialOption) + + // start grpc listener + grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + mq_pb.RegisterSeaweedMessagingServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go deleted file mode 100644 index 3274f599b..000000000 --- a/weed/command/msg_broker.go +++ /dev/null @@ -1,109 +0,0 @@ -package command - -import ( - "context" - "fmt" - "time" - - "google.golang.org/grpc/reflection" - - "github.com/chrislusf/seaweedfs/weed/util/grace" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - messageBrokerStandaloneOptions MessageBrokerOptions -) - -type MessageBrokerOptions struct { - filer *string - ip *string - port *int - cpuprofile *string - memprofile *string -} - -func init() { - cmdMsgBroker.Run = runMsgBroker // break init cycle - messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") - messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") - messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") - messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") - messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") -} - -var cmdMsgBroker = &Command{ - UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]", - Short: "start a message queue broker", - Long: `start a message queue broker - - The broker can accept gRPC calls to write or read messages. The messages are stored via filer. - The brokers are stateless. To scale up, just add more brokers. - -`, -} - -func runMsgBroker(cmd *Command, args []string) bool { - - util.LoadConfiguration("security", false) - - return messageBrokerStandaloneOptions.startQueueServer() - -} - -func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { - - grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - - filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") - cipher := false - - for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) - } - cipher = resp.Cipher - return nil - }) - if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) - time.Sleep(time.Second) - } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) - break - } - } - - qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ - Filers: []pb.ServerAddress{filerAddress}, - DefaultReplication: "", - MaxMB: 0, - Ip: *msgBrokerOpt.ip, - Port: *msgBrokerOpt.port, - Cipher: cipher, - }, grpcDialOption) - - // start grpc listener - grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) - } - grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) - messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) - reflection.Register(grpcS) - grpcS.Serve(grpcL) - - return true - -} diff --git a/weed/command/server.go b/weed/command/server.go index b1812bb9b..b993d9428 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -24,13 +24,13 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options - iamOptions IamOptions - webdavOptions WebDavOption - msgBrokerOptions MessageBrokerOptions + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + iamOptions IamOptions + webdavOptions WebDavOption + mqBrokerOptions MessageQueueBrokerOptions ) func init() { @@ -74,7 +74,7 @@ var ( isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") - isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") + isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker") serverWhiteList []string @@ -155,7 +155,7 @@ func init() { webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") - msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") + mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port") } @@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingWebDav { *isStartingFiler = true } - if *isStartingMsgBroker { + if *isStartingMqBroker { *isStartingFiler = true } @@ -208,7 +208,9 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack - msgBrokerOptions.ip = serverIp + mqBrokerOptions.ip = serverIp + mqBrokerOptions.masters = filerOptions.masters + mqBrokerOptions.filerGroup = filerOptions.filerGroup // serverOptions.v.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds @@ -217,6 +219,8 @@ func runServer(cmd *Command, args []string) bool { filerOptions.dataCenter = serverDataCenter filerOptions.rack = serverRack + mqBrokerOptions.dataCenter = serverDataCenter + mqBrokerOptions.rack = serverRack filerOptions.disableHttp = serverDisableHttp masterOptions.disableHttp = serverDisableHttp @@ -224,7 +228,7 @@ func runServer(cmd *Command, args []string) bool { s3Options.filer = &filerAddress iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress - msgBrokerOptions.filer = &filerAddress + mqBrokerOptions.filerGroup = filerOptions.filerGroup go stats_collect.StartMetricsServer(*serverMetricsHttpPort) @@ -276,10 +280,10 @@ func runServer(cmd *Command, args []string) bool { }() } - if *isStartingMsgBroker { + if *isStartingMqBroker { go func() { time.Sleep(2 * time.Second) - msgBrokerOptions.startQueueServer() + mqBrokerOptions.startQueueServer() }() } |
