aboutsummaryrefslogtreecommitdiff
path: root/weed/command/mq_agent.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-01-20 22:19:27 -0800
committerGitHub <noreply@github.com>2025-01-20 22:19:27 -0800
commitcc05874d06489751bc3892a2a012dde3921e9909 (patch)
tree7b8cc708e57af49bda2856bdece8c30029cadb05 /weed/command/mq_agent.go
parentb2f56d9add07bdd66b5a24f0cc3c4467a9ddfc66 (diff)
downloadseaweedfs-cc05874d06489751bc3892a2a012dde3921e9909.tar.xz
seaweedfs-cc05874d06489751bc3892a2a012dde3921e9909.zip
Add message queue agent (#6463)
* scaffold message queue agent * adjust proto, add mq_agent * add agent client implementation * remove unused function * agent publish server implementation * adding agent
Diffstat (limited to 'weed/command/mq_agent.go')
-rw-r--r--weed/command/mq_agent.go74
1 files changed, 74 insertions, 0 deletions
diff --git a/weed/command/mq_agent.go b/weed/command/mq_agent.go
new file mode 100644
index 000000000..2884c6531
--- /dev/null
+++ b/weed/command/mq_agent.go
@@ -0,0 +1,74 @@
+package command
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/agent"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
+ "google.golang.org/grpc/reflection"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+var (
+ mqAgentOptions MessageQueueAgentOptions
+)
+
+type MessageQueueAgentOptions struct {
+ brokers []pb.ServerAddress
+ brokersString *string
+ filerGroup *string
+ ip *string
+ port *int
+}
+
+func init() {
+ cmdMqAgent.Run = runMqAgent // break init cycle
+ mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers")
+ mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address")
+ mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port")
+}
+
+var cmdMqAgent = &Command{
+ UsageLine: "mq.agent [-port=6377] [-master=<ip:port>]",
+ Short: "<WIP> start a message queue agent",
+ Long: `start a message queue agent
+
+ The agent runs on local server to accept gRPC calls to write or read messages.
+ The messages are sent to message queue brokers.
+
+`,
+}
+
+func runMqAgent(cmd *Command, args []string) bool {
+
+ util.LoadSecurityConfiguration()
+
+ mqAgentOptions.brokers = pb.ServerAddresses(*mqAgentOptions.brokersString).ToAddresses()
+
+ return mqAgentOptions.startQueueAgent()
+
+}
+
+func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool {
+
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_agent")
+
+ agentServer := agent.NewMessageQueueAgent(&agent.MessageQueueAgentOptions{
+ SeedBrokers: mqAgentOpt.brokers,
+ }, grpcDialOption)
+
+ // start grpc listener
+ grpcL, _, err := util.NewIpAndLocalListeners(*mqAgentOpt.ip, *mqAgentOpt.port, 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err)
+ }
+ grpcS := pb.NewGrpcServer()
+ mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer)
+ reflection.Register(grpcS)
+ grpcS.Serve(grpcL)
+
+ return true
+
+}