diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-01 22:43:25 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:22:06 -0700 |
| commit | 21b6b07dd8d0379d835f9d9c1259155a12f1e61b (patch) | |
| tree | c3b13d69cac50afc227b1a06d34082cf3598f98a /weed/mq/msgclient/publisher.go | |
| parent | 8c4edf7b4014b157ee269419febe57af9cd67618 (diff) | |
| download | seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.tar.xz seaweedfs-21b6b07dd8d0379d835f9d9c1259155a12f1e61b.zip | |
renaming
Diffstat (limited to 'weed/mq/msgclient/publisher.go')
| -rw-r--r-- | weed/mq/msgclient/publisher.go | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go new file mode 100644 index 000000000..823791d10 --- /dev/null +++ b/weed/mq/msgclient/publisher.go @@ -0,0 +1,118 @@ +package msgclient + +import ( + "context" + + "github.com/OneOfOne/xxhash" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/mq/broker" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" +) + +type Publisher struct { + publishClients []mq_pb.SeaweedMessaging_PublishClient + topicConfiguration *mq_pb.TopicConfiguration + messageCount uint64 + publisherId string +} + +func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { + // read topic configuration + topicConfiguration := &mq_pb.TopicConfiguration{ + PartitionCount: 4, + } + publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) + for i := 0; i < int(topicConfiguration.PartitionCount); i++ { + tp := broker.TopicPartition{ + Namespace: namespace, + Topic: topic, + Partition: int32(i), + } + grpcClientConn, err := mc.findBroker(tp) + if err != nil { + return nil, err + } + client, err := setupPublisherClient(grpcClientConn, tp) + if err != nil { + return nil, err + } + publishClients[i] = client + } + return &Publisher{ + publishClients: publishClients, + topicConfiguration: topicConfiguration, + }, nil +} + +func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) { + + stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) + if err != nil { + return nil, err + } + + // send init message + err = stream.Send(&mq_pb.PublishRequest{ + Init: &mq_pb.PublishRequest_InitMessage{ + Namespace: tp.Namespace, + Topic: tp.Topic, + Partition: tp.Partition, + }, + }) + if err != nil { + return nil, err + } + + // process init response + initResponse, err := stream.Recv() + if err != nil { + return nil, err + } + if initResponse.Redirect != nil { + // TODO follow redirection + } + if initResponse.Config != nil { + } + + // setup looks for control messages + doneChan := make(chan error, 1) + go func() { + for { + in, err := stream.Recv() + if err != nil { + doneChan <- err + return + } + if in.Redirect != nil { + } + if in.Config != nil { + } + } + }() + + return stream, nil + +} + +func (p *Publisher) Publish(m *mq_pb.Message) error { + hashValue := p.messageCount + p.messageCount++ + if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash { + if m.Key != nil { + hashValue = xxhash.Checksum64(m.Key) + } + } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash { + hashValue = xxhash.Checksum64(m.Key) + } else { + // round robin + } + + idx := int(hashValue) % len(p.publishClients) + if idx < 0 { + idx += len(p.publishClients) + } + return p.publishClients[idx].Send(&mq_pb.PublishRequest{ + Data: m, + }) +} |
