diff options
Diffstat (limited to 'weed/mq/agent/agent_grpc_sub_session.go')
| -rw-r--r-- | weed/mq/agent/agent_grpc_sub_session.go | 57 |
1 files changed, 0 insertions, 57 deletions
diff --git a/weed/mq/agent/agent_grpc_sub_session.go b/weed/mq/agent/agent_grpc_sub_session.go deleted file mode 100644 index 17def9ed5..000000000 --- a/weed/mq/agent/agent_grpc_sub_session.go +++ /dev/null @@ -1,57 +0,0 @@ -package agent - -import ( - "context" - "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "math/rand/v2" - "time" -) - -func (a *MessageQueueAgent) StartSubscribeSession(ctx context.Context, req *mq_agent_pb.StartSubscribeSessionRequest) (*mq_agent_pb.StartSubscribeSessionResponse, error) { - sessionId := rand.Int64() - - subscriberConfig := &sub_client.SubscriberConfiguration{ - ConsumerGroup: req.ConsumerGroup, - ConsumerGroupInstanceId: req.ConsumerGroupInstanceId, - GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), - MaxPartitionCount: req.MaxSubscribedPartitions, - SlidingWindowSize: req.SlidingWindowSize, - } - - contentConfig := &sub_client.ContentConfiguration{ - Topic: topic.FromPbTopic(req.Topic), - Filter: req.Filter, - PartitionOffsets: req.PartitionOffsets, - } - - topicSubscriber := sub_client.NewTopicSubscriber( - a.brokersList(), - subscriberConfig, - contentConfig, - make(chan sub_client.KeyedOffset, 1024), - ) - - a.subscribersLock.Lock() - // remove inactive publishers to avoid memory leak - for k, entry := range a.subscribers { - if entry.lastActiveTsNs == 0 { - // this is an active session - continue - } - if time.Unix(0, entry.lastActiveTsNs).Add(10 * time.Hour).Before(time.Now()) { - delete(a.subscribers, k) - } - } - a.subscribers[SessionId(sessionId)] = &SessionEntry[*sub_client.TopicSubscriber]{ - entry: topicSubscriber, - } - a.subscribersLock.Unlock() - - return &mq_agent_pb.StartSubscribeSessionResponse{ - SessionId: sessionId, - }, nil -} |
