aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/msgclient/publisher.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-10 01:36:23 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:24:38 -0700
commit8060fdcac56bae36b53764d7ad23a142a865e67d (patch)
tree319d671fa6628fcde7003f18293f8088ae90d187 /weed/mq/msgclient/publisher.go
parentf25e273e328a9959f4dcef13c5f78e427c0bf7a0 (diff)
downloadseaweedfs-8060fdcac56bae36b53764d7ad23a142a865e67d.tar.xz
seaweedfs-8060fdcac56bae36b53764d7ad23a142a865e67d.zip
remove old code
Diffstat (limited to 'weed/mq/msgclient/publisher.go')
-rw-r--r--weed/mq/msgclient/publisher.go118
1 files changed, 0 insertions, 118 deletions
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go
deleted file mode 100644
index 823791d10..000000000
--- a/weed/mq/msgclient/publisher.go
+++ /dev/null
@@ -1,118 +0,0 @@
-package msgclient
-
-import (
- "context"
-
- "github.com/OneOfOne/xxhash"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/mq/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
-)
-
-type Publisher struct {
- publishClients []mq_pb.SeaweedMessaging_PublishClient
- topicConfiguration *mq_pb.TopicConfiguration
- messageCount uint64
- publisherId string
-}
-
-func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
- // read topic configuration
- topicConfiguration := &mq_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- tp := broker.TopicPartition{
- Namespace: namespace,
- Topic: topic,
- Partition: int32(i),
- }
- grpcClientConn, err := mc.findBroker(tp)
- if err != nil {
- return nil, err
- }
- client, err := setupPublisherClient(grpcClientConn, tp)
- if err != nil {
- return nil, err
- }
- publishClients[i] = client
- }
- return &Publisher{
- publishClients: publishClients,
- topicConfiguration: topicConfiguration,
- }, nil
-}
-
-func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
-
- stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&mq_pb.PublishRequest{
- Init: &mq_pb.PublishRequest_InitMessage{
- Namespace: tp.Namespace,
- Topic: tp.Topic,
- Partition: tp.Partition,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
- if initResponse.Config != nil {
- }
-
- // setup looks for control messages
- doneChan := make(chan error, 1)
- go func() {
- for {
- in, err := stream.Recv()
- if err != nil {
- doneChan <- err
- return
- }
- if in.Redirect != nil {
- }
- if in.Config != nil {
- }
- }
- }()
-
- return stream, nil
-
-}
-
-func (p *Publisher) Publish(m *mq_pb.Message) error {
- hashValue := p.messageCount
- p.messageCount++
- if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
- if m.Key != nil {
- hashValue = xxhash.Checksum64(m.Key)
- }
- } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
- hashValue = xxhash.Checksum64(m.Key)
- } else {
- // round robin
- }
-
- idx := int(hashValue) % len(p.publishClients)
- if idx < 0 {
- idx += len(p.publishClients)
- }
- return p.publishClients[idx].Send(&mq_pb.PublishRequest{
- Data: m,
- })
-}