diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-10 01:36:23 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-28 23:24:38 -0700 |
| commit | 8060fdcac56bae36b53764d7ad23a142a865e67d (patch) | |
| tree | 319d671fa6628fcde7003f18293f8088ae90d187 /weed/mq/msgclient/publisher.go | |
| parent | f25e273e328a9959f4dcef13c5f78e427c0bf7a0 (diff) | |
| download | seaweedfs-8060fdcac56bae36b53764d7ad23a142a865e67d.tar.xz seaweedfs-8060fdcac56bae36b53764d7ad23a142a865e67d.zip | |
remove old code
Diffstat (limited to 'weed/mq/msgclient/publisher.go')
| -rw-r--r-- | weed/mq/msgclient/publisher.go | 118 |
1 files changed, 0 insertions, 118 deletions
diff --git a/weed/mq/msgclient/publisher.go b/weed/mq/msgclient/publisher.go deleted file mode 100644 index 823791d10..000000000 --- a/weed/mq/msgclient/publisher.go +++ /dev/null @@ -1,118 +0,0 @@ -package msgclient - -import ( - "context" - - "github.com/OneOfOne/xxhash" - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/mq/broker" - "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" -) - -type Publisher struct { - publishClients []mq_pb.SeaweedMessaging_PublishClient - topicConfiguration *mq_pb.TopicConfiguration - messageCount uint64 - publisherId string -} - -func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) { - // read topic configuration - topicConfiguration := &mq_pb.TopicConfiguration{ - PartitionCount: 4, - } - publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount) - for i := 0; i < int(topicConfiguration.PartitionCount); i++ { - tp := broker.TopicPartition{ - Namespace: namespace, - Topic: topic, - Partition: int32(i), - } - grpcClientConn, err := mc.findBroker(tp) - if err != nil { - return nil, err - } - client, err := setupPublisherClient(grpcClientConn, tp) - if err != nil { - return nil, err - } - publishClients[i] = client - } - return &Publisher{ - publishClients: publishClients, - topicConfiguration: topicConfiguration, - }, nil -} - -func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) { - - stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background()) - if err != nil { - return nil, err - } - - // send init message - err = stream.Send(&mq_pb.PublishRequest{ - Init: &mq_pb.PublishRequest_InitMessage{ - Namespace: tp.Namespace, - Topic: tp.Topic, - Partition: tp.Partition, - }, - }) - if err != nil { - return nil, err - } - - // process init response - initResponse, err := stream.Recv() - if err != nil { - return nil, err - } - if initResponse.Redirect != nil { - // TODO follow redirection - } - if initResponse.Config != nil { - } - - // setup looks for control messages - doneChan := make(chan error, 1) - go func() { - for { - in, err := stream.Recv() - if err != nil { - doneChan <- err - return - } - if in.Redirect != nil { - } - if in.Config != nil { - } - } - }() - - return stream, nil - -} - -func (p *Publisher) Publish(m *mq_pb.Message) error { - hashValue := p.messageCount - p.messageCount++ - if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash { - if m.Key != nil { - hashValue = xxhash.Checksum64(m.Key) - } - } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash { - hashValue = xxhash.Checksum64(m.Key) - } else { - // round robin - } - - idx := int(hashValue) % len(p.publishClients) - if idx < 0 { - idx += len(p.publishClients) - } - return p.publishClients[idx].Send(&mq_pb.PublishRequest{ - Data: m, - }) -} |
