aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient/publisher.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-01 22:43:25 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:22:06 -0700
commit21b6b07dd8d0379d835f9d9c1259155a12f1e61b (patch)
treec3b13d69cac50afc227b1a06d34082cf3598f98a /weed/mq/msgclient/publisher.go
parent8c4edf7b4014b157ee269419febe57af9cd67618 (diff)
downloadseaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.tar.xz
seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.zip
renaming
Diffstat (limited to 'weed/mq/msgclient/publisher.go')
-rw-r--r--weed/mq/msgclient/publisher.go118
1 files changed, 118 insertions, 0 deletions
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go
new file mode 100644
index 000000000..823791d10
--- /dev/null
+++ b/weed/mq/msgclient/publisher.go
@@ -0,0 +1,118 @@
+package msgclient
+
+import (
+ "context"
+
+ "github.com/OneOfOne/xxhash"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+)
+
+type Publisher struct {
+ publishClients []mq_pb.SeaweedMessaging_PublishClient
+ topicConfiguration *mq_pb.TopicConfiguration
+ messageCount uint64
+ publisherId string
+}
+
+func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
+ // read topic configuration
+ topicConfiguration := &mq_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ client, err := setupPublisherClient(grpcClientConn, tp)
+ if err != nil {
+ return nil, err
+ }
+ publishClients[i] = client
+ }
+ return &Publisher{
+ publishClients: publishClients,
+ topicConfiguration: topicConfiguration,
+ }, nil
+}
+
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
+
+ stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
+ if err != nil {
+ return nil, err
+ }
+
+ // send init message
+ err = stream.Send(&mq_pb.PublishRequest{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // process init response
+ initResponse, err := stream.Recv()
+ if err != nil {
+ return nil, err
+ }
+ if initResponse.Redirect != nil {
+ // TODO follow redirection
+ }
+ if initResponse.Config != nil {
+ }
+
+ // setup looks for control messages
+ doneChan := make(chan error, 1)
+ go func() {
+ for {
+ in, err := stream.Recv()
+ if err != nil {
+ doneChan <- err
+ return
+ }
+ if in.Redirect != nil {
+ }
+ if in.Config != nil {
+ }
+ }
+ }()
+
+ return stream, nil
+
+}
+
+func (p *Publisher) Publish(m *mq_pb.Message) error {
+ hashValue := p.messageCount
+ p.messageCount++
+ if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
+ if m.Key != nil {
+ hashValue = xxhash.Checksum64(m.Key)
+ }
+ } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
+ hashValue = xxhash.Checksum64(m.Key)
+ } else {
+ // round robin
+ }
+
+ idx := int(hashValue) % len(p.publishClients)
+ if idx < 0 {
+ idx += len(p.publishClients)
+ }
+ return p.publishClients[idx].Send(&mq_pb.PublishRequest{
+ Data: m,
+ })
+}