aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/agent/agent_server.go
blob: 6fc61bbdb0f5f3983a562080195effa29cff9924 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package agent

import (
	"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
	"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
	"google.golang.org/grpc"
	"sync"
)

type SessionId int64
type SessionEntry[T any] struct {
	entry          T
	lastActiveTsNs int64
}

type MessageQueueAgentOptions struct {
	SeedBrokers []pb.ServerAddress
}

type MessageQueueAgent struct {
	mq_agent_pb.UnimplementedSeaweedMessagingAgentServer
	option          *MessageQueueAgentOptions
	brokers         []pb.ServerAddress
	grpcDialOption  grpc.DialOption
	publishers      map[SessionId]*SessionEntry[*pub_client.TopicPublisher]
	publishersLock  sync.RWMutex
	subscribers     map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]
	subscribersLock sync.RWMutex
}

func NewMessageQueueAgent(option *MessageQueueAgentOptions, grpcDialOption grpc.DialOption) *MessageQueueAgent {

	// check masters to list all brokers

	return &MessageQueueAgent{
		option:         option,
		brokers:        []pb.ServerAddress{},
		grpcDialOption: grpcDialOption,
		publishers:     make(map[SessionId]*SessionEntry[*pub_client.TopicPublisher]),
		subscribers:    make(map[SessionId]*SessionEntry[*sub_client.TopicSubscriber]),
	}
}

func (a *MessageQueueAgent) brokersList() []string {
	var brokers []string
	for _, broker := range a.brokers {
		brokers = append(brokers, broker.String())
	}
	return brokers
}