diff options
Diffstat (limited to 'weed/mq/client/agent_client/publish_session.go')
| -rw-r--r-- | weed/mq/client/agent_client/publish_session.go | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go index 45d46f553..c12d345a1 100644 --- a/weed/mq/client/agent_client/publish_session.go +++ b/weed/mq/client/agent_client/publish_session.go @@ -4,10 +4,10 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/mq/schema" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) type PublishSession struct { @@ -15,13 +15,12 @@ type PublishSession struct { partitionCount int publisherName string stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse] - sessionId int64 } func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { // call local agent grpc server to create a new session - clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) + clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) } @@ -48,12 +47,17 @@ func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitio return nil, fmt.Errorf("publish record: %v", err) } + if err = stream.Send(&mq_agent_pb.PublishRecordRequest{ + SessionId: resp.SessionId, + }); err != nil { + return nil, fmt.Errorf("send session id: %v", err) + } + return &PublishSession{ schema: topicSchema, partitionCount: partitionCount, publisherName: publisherName, stream: stream, - sessionId: resp.SessionId, }, nil } @@ -68,3 +72,10 @@ func (a *PublishSession) CloseSession() error { a.schema = nil return err } + +func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error { + return a.stream.Send(&mq_agent_pb.PublishRecordRequest{ + Key: key, + Value: record, + }) +} |
