diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-03-09 23:49:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-09 23:49:42 -0700 |
| commit | 02773a61074d1a130419318d05d4d0b027cac4b4 (patch) | |
| tree | 590918137bc7edfd23653e377249c45145ec7e54 /weed/command | |
| parent | 14cb8a24c68ce3fd0d3df716295805a8c5c1b8ef (diff) | |
| download | seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.tar.xz seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.zip | |
Accumulated changes for message queue (#6600)
* rename
* set agent address
* refactor
* add agent sub
* pub messages
* grpc new client
* can publish records via agent
* send init message with session id
* fmt
* check cancelled request while waiting
* use sessionId
* handle possible nil stream
* subscriber process messages
* separate debug port
* use atomic int64
* less logs
* minor
* skip io.EOF
* rename
* remove unused
* use saved offsets
* do not reuse session, since always session id is new after restart
remove last active ts from SessionEntry
* simplify printing
* purge unused
* just proxy the subscription, skipping the session step
* adjust offset types
* subscribe offset type and possible value
* start after the known tsns
* avoid wrongly set startPosition
* move
* remove
* refactor
* typo
* fix
* fix changed path
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/command.go | 1 | ||||
| -rw-r--r-- | weed/command/mq_agent.go | 5 |
2 files changed, 4 insertions, 2 deletions
diff --git a/weed/command/command.go b/weed/command/command.go index 9fdf057e7..33cdb12d1 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -32,6 +32,7 @@ var Commands = []*Command{ cmdMaster, cmdMasterFollower, cmdMount, + cmdMqAgent, cmdMqBroker, cmdS3, cmdScaffold, diff --git a/weed/command/mq_agent.go b/weed/command/mq_agent.go index 2884c6531..4a59dcf33 100644 --- a/weed/command/mq_agent.go +++ b/weed/command/mq_agent.go @@ -26,12 +26,12 @@ type MessageQueueAgentOptions struct { func init() { cmdMqAgent.Run = runMqAgent // break init cycle mqAgentOptions.brokersString = cmdMqAgent.Flag.String("broker", "localhost:17777", "comma-separated message queue brokers") - mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "localhost", "message queue agent host address") + mqAgentOptions.ip = cmdMqAgent.Flag.String("ip", "", "message queue agent host address") mqAgentOptions.port = cmdMqAgent.Flag.Int("port", 16777, "message queue agent gRPC server port") } var cmdMqAgent = &Command{ - UsageLine: "mq.agent [-port=6377] [-master=<ip:port>]", + UsageLine: "mq.agent [-port=16777] [-master=<ip:port>]", Short: "<WIP> start a message queue agent", Long: `start a message queue agent @@ -64,6 +64,7 @@ func (mqAgentOpt *MessageQueueAgentOptions) startQueueAgent() bool { if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", *mqAgentOpt.port, err) } + glog.Infof("Start Seaweed Message Queue Agent on %s:%d", *mqAgentOpt.ip, *mqAgentOpt.port) grpcS := pb.NewGrpcServer() mq_agent_pb.RegisterSeaweedMessagingAgentServer(grpcS, agentServer) reflection.Register(grpcS) |
