diff options
Diffstat (limited to 'weed/mq/client/agent_client/publish_session.go')
| -rw-r--r-- | weed/mq/client/agent_client/publish_session.go | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go new file mode 100644 index 000000000..45d46f553 --- /dev/null +++ b/weed/mq/client/agent_client/publish_session.go @@ -0,0 +1,70 @@ +package agent_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc" +) + +type PublishSession struct { + schema *schema.Schema + partitionCount int + publisherName string + stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse] + sessionId int64 +} + +func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { + + // call local agent grpc server to create a new session + clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) + } + agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn) + + resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{ + Topic: &schema_pb.Topic{ + Namespace: topicSchema.Namespace, + Name: topicSchema.Name, + }, + PartitionCount: int32(partitionCount), + RecordType: topicSchema.RecordType, + PublisherName: publisherName, + }) + if err != nil { + return nil, err + } + if resp.Error != "" { + return nil, fmt.Errorf("start publish session: %v", resp.Error) + } + + stream, err := agentClient.PublishRecord(context.Background()) + if err != nil { + return nil, fmt.Errorf("publish record: %v", err) + } + + return &PublishSession{ + schema: topicSchema, + partitionCount: partitionCount, + publisherName: publisherName, + stream: stream, + sessionId: resp.SessionId, + }, nil +} + +func (a *PublishSession) CloseSession() error { + if a.schema == nil { + return nil + } + err := a.stream.CloseSend() + if err != nil { + return fmt.Errorf("close send: %v", err) + } + a.schema = nil + return err +} |
