diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-01-20 22:19:27 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-01-20 22:19:27 -0800 |
| commit | cc05874d06489751bc3892a2a012dde3921e9909 (patch) | |
| tree | 7b8cc708e57af49bda2856bdece8c30029cadb05 /weed/command/mq_agent.go | |
| parent | b2f56d9add07bdd66b5a24f0cc3c4467a9ddfc66 (diff) | |
| download | seaweedfs-cc05874d06489751bc3892a2a012dde3921e9909.tar.xz seaweedfs-cc05874d06489751bc3892a2a012dde3921e9909.zip | |
Add message queue agent (#6463)
* scaffold message queue agent
* adjust proto, add mq_agent
* add agent client implementation
* remove unused function
* agent publish server implementation
* adding agent
Diffstat (limited to 'weed/command/mq_agent.go')
| -rw-r--r-- | weed/command/mq_agent.go | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/weed/command/mq_agent.go b/weed/command/mq_agent.go new file mode 100644 index 000000000..2884c6531 --- /dev/null +++ b/weed/command/mq_agent.go @@ -0,0 +1,74 @@ +package command + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/agent" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "google.golang.org/grpc/reflection" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var ( + mqAgentOptions MessageQueueAgentOptions +) + +type MessageQueueAgentOptions struct { + brokers []pb.ServerAddress + brokersString *string + filerGroup *string + ip *string + port *int +} + +func init() { + cmdMqAgent.Run = runMqAgent // break init cycle + mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers") + mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address") + mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port") +} + +var cmdMqAgent = &Command{ + UsageLine: "mq.agent [-port=6377] [-master=<ip:port>]", + Short: "<WIP> start a message queue agent", + Long: `start a message queue agent + + The agent runs on local server to accept gRPC calls to write or read messages. + The messages are sent to message queue brokers. + +`, +} + +func runMqAgent(cmd *Command, args []string) bool { + + util.LoadSecurityConfiguration() + + mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses() + + return mqAgentOptions.startQueueAgent() + +} + +func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool { + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent") + + agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{ + SeedBrokers: mqAgentOpt.brokers, + }, grpcDialOption) + + // start grpc listener + grpcL, _, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err) + } + grpcS := pb.NewGrpcServer() + mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} |
