aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/agent/agent_grpc_pub_session.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/agent/agent_grpc_pub_session.go')
-rw-r--r--weed/mq/agent/agent_grpc_pub_session.go61
1 files changed, 61 insertions, 0 deletions
diff --git a/weed/mq/agent/agent_grpc_pub_session.go b/weed/mq/agent/agent_grpc_pub_session.go
new file mode 100644
index 000000000..d5c6d0813
--- /dev/null
+++ b/weed/mq/agent/agent_grpc_pub_session.go
@@ -0,0 +1,61 @@
+package agent
+
+import (
+ "context"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
+ "log/slog"
+ "math/rand/v2"
+ "time"
+)
+
+func (a *MessageQueueAgent) StartPublishSession(ctx context.Context, req *mq_agent_pb.StartPublishSessionRequest) (*mq_agent_pb.StartPublishSessionResponse, error) {
+ sessionId := rand.Int64()
+
+ topicPublisher := pub_client.NewTopicPublisher(
+ &pub_client.PublisherConfiguration{
+ Topic: topic.NewTopic(req.Topic.Namespace, req.Topic.Name),
+ PartitionCount: req.PartitionCount,
+ Brokers: a.brokersList(),
+ PublisherName: req.PublisherName,
+ RecordType: req.RecordType,
+ })
+
+ a.publishersLock.Lock()
+ // remove inactive publishers to avoid memory leak
+ for k, entry := range a.publishers {
+ if entry.lastActiveTsNs == 0 {
+ // this is an active session
+ continue
+ }
+ if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) {
+ delete(a.publishers, k)
+ }
+ }
+ a.publishers[SessionId(sessionId)] = &SessionEntry[*pub_client.TopicPublisher]{
+ entry: topicPublisher,
+ }
+ a.publishersLock.Unlock()
+
+ return &mq_agent_pb.StartPublishSessionResponse{
+ SessionId: sessionId,
+ }, nil
+}
+
+func (a *MessageQueueAgent) ClosePublishSession(ctx context.Context, req *mq_agent_pb.ClosePublishSessionRequest) (*mq_agent_pb.ClosePublishSessionResponse, error) {
+ var finishErr string
+ a.publishersLock.Lock()
+ publisherEntry, found := a.publishers[SessionId(req.SessionId)]
+ if found {
+ if err := publisherEntry.entry.FinishPublish(); err != nil {
+ finishErr = err.Error()
+ slog.Warn("failed to finish publish", "error", err)
+ }
+ delete(a.publishers, SessionId(req.SessionId))
+ }
+ a.publishersLock.Unlock()
+ return &mq_agent_pb.ClosePublishSessionResponse{
+ Error: finishErr,
+ }, nil
+}