aboutsummaryrefslogtreecommitdiff
path: root/weed/command/mq_broker.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-01 21:59:50 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:22:06 -0700
commit8ca7d1ef174da9b58bf3a83e53d41af27a01572a (patch)
tree24e5d7939a3df3a986cba4b2bae1181084448512 /weed/command/mq_broker.go
parent3e7e922d1691263bdda868d377f944f92b9480d2 (diff)
downloadseaweedfs-8ca7d1ef174da9b58bf3a83e53d41af27a01572a.tar.xz
seaweedfs-8ca7d1ef174da9b58bf3a83e53d41af27a01572a.zip
rename to SeaweedMQ
Diffstat (limited to 'weed/command/mq_broker.go')
-rw-r--r--weed/command/mq_broker.go109
1 files changed, 109 insertions, 0 deletions
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go
new file mode 100644
index 000000000..a5a6e3566
--- /dev/null
+++ b/weed/command/mq_broker.go
@@ -0,0 +1,109 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc/reflection"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ mqBrokerStandaloneOptions MessageQueueBrokerOptions
+)
+
+type MessageQueueBrokerOptions struct {
+ filer *string
+ ip *string
+ port *int
+ cpuprofile *string
+ memprofile *string
+}
+
+func init() {
+ cmdMqBroker.Run = runMqBroker // break init cycle
+ mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
+ mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
+ mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file")
+ mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file")
+}
+
+var cmdMqBroker = &Command{
+ UsageLine: "mq.broker [-port=17777] [-filer=<ip:port>]",
+ Short: "start a message queue broker",
+ Long: `start a message queue broker
+
+ The broker can accept gRPC calls to write or read messages. The messages are stored via filer.
+ The brokers are stateless. To scale up, just add more brokers.
+
+`,
+}
+
+func runMqBroker(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ return mqBrokerStandaloneOptions.startQueueServer()
+
+}
+
+func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
+
+ grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
+
+ filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
+ cipher := false
+
+ for {
+ err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
+ }
+ cipher = resp.Cipher
+ return nil
+ })
+ if err != nil {
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
+ time.Sleep(time.Second)
+ } else {
+ glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
+ break
+ }
+ }
+
+ qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
+ Filers: []pb.ServerAddress{filerAddress},
+ DefaultReplication: "",
+ MaxMB: 0,
+ Ip: *mqBrokerOpt.ip,
+ Port: *mqBrokerOpt.port,
+ Cipher: cipher,
+ }, grpcDialOption)
+
+ // start grpc listener
+ grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
+ messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}