diff options
Diffstat (limited to 'weed/mq/agent/agent_server.go')
| -rw-r--r-- | weed/mq/agent/agent_server.go | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/weed/mq/agent/agent_server.go b/weed/mq/agent/agent_server.go new file mode 100644 index 000000000..6fc61bbdb --- /dev/null +++ b/weed/mq/agent/agent_server.go @@ -0,0 +1,52 @@ +package agent + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "google.golang.org/grpc" + "sync" +) + +type SessionId int64 +type SessionEntry[T any] struct { + entry T + lastActiveTsNs int64 +} + +type MessageQueueAgentOptions struct { + SeedBrokers []pb.ServerAddress +} + +type MessageQueueAgent struct { + mq_agent_pb.UnimplementedSeaweedMessagingAgentServer + option *MessageQueueAgentOptions + brokers []pb.ServerAddress + grpcDialOption grpc.DialOption + publishers map[SessionId]*SessionEntry[*pub_client.TopicPublisher] + publishersLock sync.RWMutex + subscribers map[SessionId]*SessionEntry[*sub_client.TopicSubscriber] + subscribersLock sync.RWMutex +} + +func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent { + + // check masters to list all brokers + + return &MessageQueueAgent{ + option: option, + brokers: []pb.ServerAddress{}, + grpcDialOption: grpcDialOption, + publishers: make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]), + subscribers: make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]), + } +} + +func (a *MessageQueueAgent) brokersList() []string { + var brokers []string + for _, broker := range a.brokers { + brokers = append(brokers, broker.String()) + } + return brokers +} |
