diff options
Diffstat (limited to 'weed/command/mq_broker.go')
| -rw-r--r-- | weed/command/mq_broker.go | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go new file mode 100644 index 000000000..a5a6e3566 --- /dev/null +++ b/weed/command/mq_broker.go @@ -0,0 +1,109 @@ +package command + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc/reflection" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/messaging/broker" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + mqBrokerStandaloneOptions MessageQueueBrokerOptions +) + +type MessageQueueBrokerOptions struct { + filer *string + ip *string + port *int + cpuprofile *string + memprofile *string +} + +func init() { + cmdMqBroker.Run = runMqBroker // break init cycle + mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address") + mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") + mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") + mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") + mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") +} + +var cmdMqBroker = &Command{ + UsageLine: "mq.broker [-port=17777] [-filer=<ip:port>]", + Short: "start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMqBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + return mqBrokerStandaloneOptions.startQueueServer() + +} + +func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { + + grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) + + filerAddress := pb.ServerAddress(*mqBrokerOpt.filer) + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + cipher := false + + for { + err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) + } + cipher = resp.Cipher + return nil + }) + if err != nil { + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) + time.Sleep(time.Second) + } else { + glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress()) + break + } + } + + qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ + Filers: []pb.ServerAddress{filerAddress}, + DefaultReplication: "", + MaxMB: 0, + Ip: *mqBrokerOpt.ip, + Port: *mqBrokerOpt.port, + Cipher: cipher, + }, grpcDialOption) + + // start grpc listener + grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} |
