aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/agent_client/publish_session.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/agent_client/publish_session.go')
-rw-r--r--weed/mq/client/agent_client/publish_session.go19
1 files changed, 15 insertions, 4 deletions
diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go
index 45d46f553..c12d345a1 100644
--- a/weed/mq/client/agent_client/publish_session.go
+++ b/weed/mq/client/agent_client/publish_session.go
@@ -4,10 +4,10 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
)
type PublishSession struct {
@@ -15,13 +15,12 @@ type PublishSession struct {
partitionCount int
publisherName string
stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse]
- sessionId int64
}
func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) {
// call local agent grpc server to create a new session
- clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure())
+ clientConn, err := grpc.NewClient(agentAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err)
}
@@ -48,12 +47,17 @@ func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitio
return nil, fmt.Errorf("publish record: %v", err)
}
+ if err = stream.Send(&mq_agent_pb.PublishRecordRequest{
+ SessionId: resp.SessionId,
+ }); err != nil {
+ return nil, fmt.Errorf("send session id: %v", err)
+ }
+
return &PublishSession{
schema: topicSchema,
partitionCount: partitionCount,
publisherName: publisherName,
stream: stream,
- sessionId: resp.SessionId,
}, nil
}
@@ -68,3 +72,10 @@ func (a *PublishSession) CloseSession() error {
a.schema = nil
return err
}
+
+func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error {
+ return a.stream.Send(&mq_agent_pb.PublishRecordRequest{
+ Key: key,
+ Value: record,
+ })
+}