aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/agent/agent_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/agent/agent_server.go')
-rw-r--r--weed/mq/agent/agent_server.go52
1 files changed, 52 insertions, 0 deletions
diff --git a/weed/mq/agent/agent_server.go b/weed/mq/agent/agent_server.go
new file mode 100644
index 000000000..6fc61bbdb
--- /dev/null
+++ b/weed/mq/agent/agent_server.go
@@ -0,0 +1,52 @@
+package agent
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
+ "google.golang.org/grpc"
+ "sync"
+)
+
+type SessionId int64
+type SessionEntry[T any] struct {
+ entry T
+ lastActiveTsNs int64
+}
+
+type MessageQueueAgentOptions struct {
+ SeedBrokers []pb.ServerAddress
+}
+
+type MessageQueueAgent struct {
+ mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
+ option *MessageQueueAgentOptions
+ brokers []pb.ServerAddress
+ grpcDialOption grpc.DialOption
+ publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
+ publishersLock sync.RWMutex
+ subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
+ subscribersLock sync.RWMutex
+}
+
+func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {
+
+ // check masters to list all brokers
+
+ return &MessageQueueAgent{
+ option: option,
+ brokers: []pb.ServerAddress{},
+ grpcDialOption: grpcDialOption,
+ publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
+ subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
+ }
+}
+
+func (a *MessageQueueAgent) brokersList() []string {
+ var brokers []string
+ for _, broker := range a.brokers {
+ brokers = append(brokers, broker.String())
+ }
+ return brokers
+}