diff options
Diffstat (limited to 'weed/mq/client/agent_client')
| -rw-r--r-- | weed/mq/client/agent_client/SubscribeSession.go | 63 | ||||
| -rw-r--r-- | weed/mq/client/agent_client/agent_publish.go | 14 | ||||
| -rw-r--r-- | weed/mq/client/agent_client/agent_subscribe.go | 17 | ||||
| -rw-r--r-- | weed/mq/client/agent_client/publish_session.go | 70 |
4 files changed, 164 insertions, 0 deletions
diff --git a/weed/mq/client/agent_client/SubscribeSession.go b/weed/mq/client/agent_client/SubscribeSession.go new file mode 100644 index 000000000..fc87e17ca --- /dev/null +++ b/weed/mq/client/agent_client/SubscribeSession.go @@ -0,0 +1,63 @@ +package agent_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc" +) + +type SubscribeOption struct { + ConsumerGroup string + ConsumerGroupInstanceId string + Topic topic.Topic + Filter string + MaxSubscribedPartitions int32 + PerPartitionConcurrency int32 +} + +type SubscribeSession struct { + Option *SubscribeOption + stream grpc.BidiStreamingClient[mq_agent_pb.SubscribeRecordRequest, mq_agent_pb.SubscribeRecordResponse] + sessionId int64 +} + +func NewSubscribeSession(agentAddress string, option *SubscribeOption) (*SubscribeSession, error) { + // call local agent grpc server to create a new session + clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) + } + agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn) + + resp, err := agentClient.StartSubscribeSession(context.Background(), &mq_agent_pb.StartSubscribeSessionRequest{ + ConsumerGroup: option.ConsumerGroup, + ConsumerGroupInstanceId: option.ConsumerGroupInstanceId, + Topic: &schema_pb.Topic{ + Namespace: option.Topic.Namespace, + Name: option.Topic.Name, + }, + MaxSubscribedPartitions: option.MaxSubscribedPartitions, + Filter: option.Filter, + }) + if err != nil { + return nil, err + } + if resp.Error != "" { + return nil, fmt.Errorf("start subscribe session: %v", resp.Error) + } + + stream, err := agentClient.SubscribeRecord(context.Background()) + if err != nil { + return nil, fmt.Errorf("subscribe record: %v", err) + } + + return &SubscribeSession{ + Option: option, + stream: stream, + sessionId: resp.SessionId, + }, nil +} diff --git a/weed/mq/client/agent_client/agent_publish.go b/weed/mq/client/agent_client/agent_publish.go new file mode 100644 index 000000000..3e16f3b99 --- /dev/null +++ b/weed/mq/client/agent_client/agent_publish.go @@ -0,0 +1,14 @@ +package agent_client + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func (a *PublishSession) PublishMessageRecord(key []byte, record *schema_pb.RecordValue) error { + return a.stream.Send(&mq_agent_pb.PublishRecordRequest{ + SessionId: a.sessionId, + Key: key, + Value: record, + }) +} diff --git a/weed/mq/client/agent_client/agent_subscribe.go b/weed/mq/client/agent_client/agent_subscribe.go new file mode 100644 index 000000000..626a3a123 --- /dev/null +++ b/weed/mq/client/agent_client/agent_subscribe.go @@ -0,0 +1,17 @@ +package agent_client + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func (a *SubscribeSession) SubscribeMessageRecord( + onEachMessageFn func(key []byte, record *schema_pb.RecordValue), + onCompletionFn func()) error { + for { + resp, err := a.stream.Recv() + if err != nil { + return err + } + onEachMessageFn(resp.Key, resp.Value) + } +} diff --git a/weed/mq/client/agent_client/publish_session.go b/weed/mq/client/agent_client/publish_session.go new file mode 100644 index 000000000..45d46f553 --- /dev/null +++ b/weed/mq/client/agent_client/publish_session.go @@ -0,0 +1,70 @@ +package agent_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc" +) + +type PublishSession struct { + schema *schema.Schema + partitionCount int + publisherName string + stream grpc.BidiStreamingClient[mq_agent_pb.PublishRecordRequest, mq_agent_pb.PublishRecordResponse] + sessionId int64 +} + +func NewPublishSession(agentAddress string, topicSchema *schema.Schema, partitionCount int, publisherName string) (*PublishSession, error) { + + // call local agent grpc server to create a new session + clientConn, err := pb.GrpcDial(context.Background(), agentAddress, true, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("dial agent server %s: %v", agentAddress, err) + } + agentClient := mq_agent_pb.NewSeaweedMessagingAgentClient(clientConn) + + resp, err := agentClient.StartPublishSession(context.Background(), &mq_agent_pb.StartPublishSessionRequest{ + Topic: &schema_pb.Topic{ + Namespace: topicSchema.Namespace, + Name: topicSchema.Name, + }, + PartitionCount: int32(partitionCount), + RecordType: topicSchema.RecordType, + PublisherName: publisherName, + }) + if err != nil { + return nil, err + } + if resp.Error != "" { + return nil, fmt.Errorf("start publish session: %v", resp.Error) + } + + stream, err := agentClient.PublishRecord(context.Background()) + if err != nil { + return nil, fmt.Errorf("publish record: %v", err) + } + + return &PublishSession{ + schema: topicSchema, + partitionCount: partitionCount, + publisherName: publisherName, + stream: stream, + sessionId: resp.SessionId, + }, nil +} + +func (a *PublishSession) CloseSession() error { + if a.schema == nil { + return nil + } + err := a.stream.CloseSend() + if err != nil { + return fmt.Errorf("close send: %v", err) + } + a.schema = nil + return err +} |
